Flume是一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,F(xiàn)lume提供對數(shù)據(jù)進(jìn)行簡單處理,并寫到各種數(shù)據(jù)接受方的能力。
用戶使用Flume系統(tǒng)采集日志,并且通過 LTS 側(cè)提供的KAFKA協(xié)議方式上報(bào)日志。以下是部分常用數(shù)據(jù)采集場景示例:
- 使用Flume采集文本日志上報(bào)到LTS
- 使用Flume采集 數(shù)據(jù)庫 表數(shù)據(jù)并且上報(bào)至LTS
- 使用Flume采集syslog協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS
- 通過Flume采集TCP/UDP協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS
- 通過Flume采集SNMP協(xié)議上報(bào)的設(shè)備管理數(shù)據(jù)并發(fā)送到LTS
- 使用默認(rèn)攔截器處理日志
- 自定義攔截器處理日志
- 使用外部數(shù)據(jù)源豐富日志內(nèi)容并上報(bào)到LTS
前提條件
用戶機(jī)器已經(jīng)安裝了JDK。
用戶已經(jīng)安裝Flume,并且需要在Flume中配置文件中配置JDK路徑。
使用Flume采集文本日志上報(bào)到LTS
支持使用Flume采集文本日志內(nèi)容上報(bào)至LTS,參考如下示例添加采集文本日志的conf文件。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
使用Flume采集數(shù)據(jù)庫表數(shù)據(jù)并且上報(bào)至LTS,實(shí)現(xiàn)對表數(shù)據(jù)變動監(jiān)控。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。
- 在https://github.com/keedio/flume-ng-sql-source頁面下載flume-ng-sql-source插件,轉(zhuǎn)換為jar包并取名為flume-ng-sql-source.jar,打包前注意將pom文件中的flume-ng-core 版本與flume安裝版本保持一致,并且將jar包放在安裝Flume包路徑的lib目錄下面,例如FLUME_HOME/lib目錄下(例子中的FLUME_HOME為Flume安裝路徑,僅供參考,請以實(shí)際安裝路徑為準(zhǔn))。
-
- 下載MySQL驅(qū)動。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
- 將驅(qū)動包解壓并打?yàn)閖ar包。
tar xzf mysql-connector-java-5.1.35.tar.gz
- 將jar包存放在FLUME_HOME/lib/路徑。
cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/
- 下載MySQL驅(qū)動。
- 添加采集MySQL的conf文件。
# a1表示agent的名稱 # source是a1的輸入源 # channels是緩沖區(qū) # sinks是a1輸出目的地,本例子sinks使用了kafka a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # 連接mysql的一系列操作,{mysql_host}改為您虛擬機(jī)的 ip地址 ,可以通過ifconfig或者ip addr查看,{database_name}改為數(shù)據(jù)庫名稱 # url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否則有可能連接失敗 a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # mysql賬號,一般都是root a1.sources.r1.hibernate.connection.user = root # 填入您的mysql密碼 a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # mysql驅(qū)動 a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # 存放status文件 a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # 填寫需要采集的數(shù)據(jù)表名{table_name},也可以使用下面的方法: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
- 啟動Flume后,即可開始采集數(shù)據(jù)庫中的表數(shù)據(jù)到LTS。
使用Flume采集syslog協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS
Syslog協(xié)議是一種用于在IP網(wǎng)絡(luò)中傳輸日志消息的協(xié)議,通過Flume將syslog協(xié)議傳輸?shù)娜罩静杉⑸蠄?bào)到LTS。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。
- 接收UDP日志,參考如下示例添加采集Syslog協(xié)議的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp #host_port為syslog服務(wù)器的端口 a1.sources.r1.port = {host_port} #host_ip為syslog服務(wù)器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
- 接收TCP日志,參考如下示例添加采集Syslog協(xié)議的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp #host_port為syslog服務(wù)器的端口 a1.sources.r1.port = {host_port} #host_ip為syslog服務(wù)器的ip地址 a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
通過Flume采集TCP/UDP協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS
通過Flume采集TCP/UDP協(xié)議傳輸?shù)娜罩旧蠄?bào)到LTS。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。
- 采集TCP端口日志,參考如下示例添加采集端口的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 采集UDP端口日志,參考如下示例添加采集端口的conf文件。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
通過Flume采集SNMP協(xié)議上報(bào)的設(shè)備管理數(shù)據(jù)并發(fā)送到LTS
通過Flume采集SNMP協(xié)議上報(bào)的設(shè)備管理數(shù)據(jù)并發(fā)送到LTS。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。
- 監(jiān)聽SNMP協(xié)議通信端口號161。參考如下示例添加SNMP協(xié)議接受日志的conf。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 監(jiān)聽SNMP協(xié)議陷阱(Trap)通信的端口號162,參考如下示例添加SNMP協(xié)議接受日志的conf。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
使用默認(rèn)攔截器處理日志
使用Flume采集器時,攔截器是簡單的插件式組件,設(shè)置在Source和Channel之間。Source接收到的事件Event,在寫入Channel之前,攔截器都可以進(jìn)行轉(zhuǎn)換或者刪除這些事件。每個攔截器只處理同一個Source接收到的事件。
- 時間戳攔截器
該攔截器的作用是將時間戳插入到flume的事件報(bào)頭中。如果不使用任何攔截器,flume接收到的只有message。時間戳攔截器的配置, 參數(shù)默認(rèn)值描述type,類型名稱timestamp,也可以使用類名的全路徑preserveExisting為false。如果設(shè)置為true,若事件中報(bào)頭已經(jīng)存在,不會替換時間戳報(bào)頭的值。source連接到時間戳攔截器的配置:
a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false
- 正則過濾攔截器
在日志采集的時候,可能有一些數(shù)據(jù)是不需要的,添加過濾攔截器可以過濾掉不需要的日志,也可以根據(jù)需要收集滿足正則條件的日志。參數(shù)默認(rèn)值描述type,類型名稱REGEX_FILTER。excludeEvents為false時默認(rèn)收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的。source連接到正則過濾攔截器的配置:
a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false
這樣配置的攔截器就只會接收日志消息中帶有today或者M(jìn)onday的日志。
- 搜索并替換攔截器
攔截器基于Java正則表達(dá)式提供簡單的基于字符串的搜索和替換功能。配置如下:
# 攔截器別名 a1.sources.r1.interceptors = search-replace # 攔截器類型,必須是search_replace a1.sources.r1.interceptors.search-replace.type = search_replace #刪除事件正文中的字符,根據(jù)正則匹配event內(nèi)容 a1.sources.r1.interceptors.search-replace.searchPattern = today # 替換匹配到的event內(nèi)容 a1.sources.r1.interceptors.search-replace.replaceString = yesterday # 設(shè)置字符集,默認(rèn)是utf8 a1.sources.r1.interceptors.search-replace.charset = utf8
自定義攔截器處理日志
在Flume中自定義攔截器的方式主要流程如下(以java語言為例),以下示例中的FLUME_HOME表示Flume的安裝路徑,例如/tools/flume(僅供參考),實(shí)際配置的時候,請以用戶安裝Flume的實(shí)際路徑為準(zhǔn)。
- 創(chuàng)建MAVEN工程項(xiàng)目,引入Flume依賴。
根據(jù)集群中的 Flume 版本,引入 Flume 依賴,如下所示:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> </dependencies>
無需將該依賴打包進(jìn)最后的JAR包中,故將其作用域設(shè)置為provided。
- 創(chuàng)建類實(shí)現(xiàn)攔截器接口Interceptor,并且實(shí)現(xiàn)相關(guān)方法。
- initialize() 方法:初始化攔截器操作,讀取配置信息、建立連接等。
- intercept(Event event) 方法:用于攔截單個事件,并對事件進(jìn)行處理。接收一個事件對象作為輸入,并返回一個修改后的事件對象。
- intercept(List<Event> list) 方法:事件批處理,攔截事件列表,并對事件列表進(jìn)行處理。
- close() 方法:關(guān)閉攔截器,在這里釋放資源、關(guān)閉連接等。
import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 獲取事件數(shù)據(jù) String eventData = new String(event.getBody(), StandardCharsets.UTF_8); // 檢查事件數(shù)據(jù)中是否包含指定字符串 if (eventData.contains("hello")) { // 如果包含指定字符串,則過濾掉該事件,返回 null return null; } return event; } @Override public List<Event> intercept(List<Event> events) { // 創(chuàng)建一個新的列表,存儲處理過后的事件 List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
- 構(gòu)建攔截器,攔截器的創(chuàng)建和配置通常是通過 Builder 模式來完成的,完整的代碼如下所示:
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 獲取事件數(shù)據(jù) String eventData = new String(event.getBody(), StandardCharsets.UTF_8); // 檢查事件數(shù)據(jù)中是否包含指定字符串 if (eventData.contains("hello")) { // 如果包含指定字符串,則過濾掉該事件,返回 null return null; } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } // 攔截器構(gòu)建 public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { } @Override public Interceptor build() { return new TestInterceptor(); } } }
- 轉(zhuǎn)換為jar包,并且將其上傳至Flume安裝路徑下的lib文件夾下(請以用戶安裝Flume的實(shí)際路徑為準(zhǔn))。
- 編寫配置文件,需要將自定義的攔截器配置進(jìn)去。
攔截器全類名配置時需要注意,格式為攔截器的全類名 + $Builder。
# 攔截器配置 # 攔截器定義 a1.sources.r1.interceptors = i1 # 攔截器全類名 a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder
- 運(yùn)行Flume即可。
public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 獲取事件數(shù)據(jù) String eventData = new String(event.getBody(), StandardCharsets.UTF_8); Map<String, Object> splitMap = new HashMap<>(); String[] splitList = eventData.split(" "); for (int i = 0; i < splitList.length; i++) { splitMap.put("field" + i, splitList[i].trim()); } eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8)); return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
使用外部數(shù)據(jù)源豐富日志內(nèi)容并上報(bào)到LTS
Flume數(shù)據(jù)傳輸?shù)幕締卧訣vent的形式將數(shù)據(jù)從源頭傳輸至目的地。Event由Header和Body兩部分組成,Header用來存放該Event的一些屬性,為K-V結(jié)構(gòu),Body用來存放該條數(shù)據(jù),形式為字節(jié)數(shù)組。
有外部數(shù)據(jù)源時,如果您需要豐富日志內(nèi)容,例如修改日志內(nèi)容、添加字段、刪除內(nèi)容等操作,將修改內(nèi)容添加至Event的body中,F(xiàn)lume才能將日志內(nèi)容上報(bào)到LTS。例如使用Java自定義擴(kuò)展日志內(nèi)容。以下示例中的參數(shù)介紹請參考使用KAFKA協(xié)議上報(bào)日志。
import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 獲取事件數(shù)據(jù),將原數(shù)據(jù)轉(zhuǎn)換為json字符串并且添加額外字段 String eventData = new String(event.getBody(), StandardCharsets.UTF_8); JSONObject object = new JSONObject(); object.put("content", eventData); object.put("workLoadType", "RelipcaSet"); eventData = object.toJSONString(); eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8)); return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }