1.概述
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。
Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。
Flume支持定制各类数据发送方,用于收集各类型数据;同时,Flume支持定制各种数据接受方,用于最终存储数据。一般的采集需求,通过对flume的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力。因此,flume可以适用于大部分的日常数据采集场景。
当前Flume有两个版本。Flume 0.9X版本的统称Flume OG(original generation),Flume1.X版本的统称Flume NG(next generation)。由于Flume NG经过核心组件、核心配置以及代码架构重构,与Flume OG有很大不同,使用时请注意区分。改动的另一原因是将Flume纳入 apache 旗下,Cloudera Flume 改名为 Apache Flume。
2.运行机制
Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。

每一个agent相当于一个数据传递员,内部有三个组件:
Source:采集源,用于跟数据源对接,以获取数据;
Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据;
Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
一个完整的event包括:event headers、event body、event信息,其中event信息就是flume收集到的日记记录。
3.Flum采集系统构建图
3.1简单结构
单个 agent 采集数据

3.2复杂结构
多个 agent 之间串联

4.采集日志Flume快速入门
4.1安装部署
-
将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下 -
解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录 [lili@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
-
修改apache-flume-1.7.0-bin的名称为flume [lili@hadoop102 module]$ mv apache-flume-1.7.0-bin flume
-
将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件 [lili@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[lili@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
4.2Flume具体配置
-
在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件 [lili@hadoop102 conf]$ vim file-flume-kafka.conf
-
配置该文件 File—>Flume—>Kafka Flume官网:官网入口
a1.sources=r1
a1.channels=c1 c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
4.3Flume的ETL和分类型拦截器
ETL拦截器:主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器:主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
前期准备:
- 创建Maven工程flume-interceptor。
- 创建包名com.atguigu.flume.interceptor
4.3.1配置pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4.3.2ETL拦截器
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body,Charset.forName("UTF-8"));
if (log.contains("start")) {
if (LogUtils.validateStart(log)) {
return event;
}
} else {
if(LogUtils.validateEvent(log)) {
return event;
}
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : list) {
Event intercept1 = intercept(event);
if (intercept1 != null) {
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4.3.3日志过滤工具类
package com.atguigu.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
String[] logContents = log.split("\\|");
if (logContents.length != 2) {
return false;
}
if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
return false;
}
if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) {
return false;
}
return true;
}
public static boolean validateStart(String log) {
if (log == null) return false;
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) return false;
return true;
}
}
4.3.4日志分类拦截器
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
Map<String, String> headers = event.getHeaders();
if (log.contains("start")) {
headers.put("topic" , "topic_start");
} else {
headers.put("topic" , "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : list) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4.3.5打包上传并启动
-
将不带依赖的包,上传到hadoop102的Flume的lib文件夹下。 [lili@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar
-
将flume-interceptor-1.0-SNAPSHOT.jar分发到hadoop103,hadoop104 前期准备:xysnc分发脚本 #!/bin/bash
pcount=$#
if ((pcount==0)); then
echo no args;
exit;
fi
p1=$1
fname=`basename $p1`
echo fname=$fname
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
user=`whoami`
for host in hadoop102 hadoop103 hadoop104
do
echo ------------------- $host --------------
rsync -av $pdir/$fname $user@$host:$pdir
done
分发flume-interceptor-1.0-SNAPSHOT.jar [lili@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar
fname=flume-interceptor-1.0-SNAPSHOT.jar
pdir=/opt/module/flume/lib
------------------- hadoop102 --------------
sending incremental file list
sent 72 bytes received 12 bytes 168.00 bytes/sec
total size is 6667 speedup is 79.37
------------------- hadoop103 --------------
sending incremental file list
flume-interceptor-1.0-SNAPSHOT.jar
sent 6782 bytes received 31 bytes 4542.00 bytes/sec
total size is 6667 speedup is 0.98
------------------- hadoop104 --------------
sending incremental file list
flume-interceptor-1.0-SNAPSHOT.jar
sent 6782 bytes received 31 bytes 13626.00 bytes/sec
total size is 6667 speedup is 0.98
-
分发file-flume-kafka.conf到hadoop102,hadoop103 [lili@hadoop102 conf]$ pwd
/opt/module/flume/conf
[lili@hadoop102 conf]$ xsync file-flume-kafka.conf
fname=file-flume-kafka.conf
pdir=/opt/module/flume/conf
------------------- hadoop102 --------------
sending incremental file list
sent 59 bytes received 12 bytes 47.33 bytes/sec
total size is 1307 speedup is 18.41
------------------- hadoop103 --------------
sending incremental file list
file-flume-kafka.conf
sent 1409 bytes received 31 bytes 2880.00 bytes/sec
total size is 1307 speedup is 0.91
------------------- hadoop104 --------------
sending incremental file list
file-flume-kafka.conf
sent 1409 bytes received 31 bytes 960.00 bytes/sec
total size is 1307 speedup is 0.91
5.采集日志FLume启动脚本
-
在/home/lili/bin目录下创建脚本f1.sh [lili@hadoop102 bin]$ vim f1.sh
case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/job/data-warehouse/flume.log 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
-
增加脚本权限 [lili@hadoop102 bin]$ chmod 777 f1.sh
-
Flume集群启动 [lili@hadoop102 flume]$ f1.sh start
--------启动 hadoop102 采集flume-------
--------启动 hadoop103 采集flume-------
[lili@hadoop102 bin]$ jps
17358 Jps
17139 Application
-
Flume集群停止 [lili@hadoop102 flume]$ f1.sh stop
--------停止 hadoop102 采集flume-------
--------停止 hadoop103 采集flume-------
[lili@hadoop102 bin]$ jps
17378 Jps
6.脚本补充
6.1Hadoop启动脚本
[lili@hadoop102 bin]$ vim hdp
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
case $1 in
"start" ){
echo "================ 开始启动集群 ================"
echo "================ 正在启动HDFS ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/start-dfs.sh'
echo "================ 正在启动YARN ================"
ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/start-yarn.sh'
echo "================ 正在启动JobHistoryServer ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver'
};;
"stop" ){
echo "================ 开始关闭集群 ================"
echo "================ 正在关闭JobHistoryServer ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver'
echo "================ 正在关闭YARN ================"
ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh'
echo "================ 正在关闭HDFS ================"
ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh'
};;
esac
6.2生成日志脚本
[lili@hadoop102 bin]$ vim lg.sh
for i in hadoop102 hadoop103
do
echo ----------生成日志----------
ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
done
6.3zookeeper启动脚本
[lili@hadoop102 bin]$ vim zk.sh
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
done
};;
esac
6.3集群命令脚本
注:对集群发出命令并返回结果,多用于进程的查看
[lili@hadoop102 bin]$ vim xcall.sh
for i in hadoop102 hadoop103 hadoop104
do
echo ----------$i----------
ssh $i "$* "
done
例如:
[lili@hadoop102 bin]$ xcall.sh jps
----------hadoop102----------
17469 Jps
----------hadoop103----------
10961 Jps
----------hadoop104----------
11583 Jps
6.4统一集群时间脚本
[lili@hadoop102 bin]$ vim dt.sh
for i in hadoop102 hadoop103 hadoop104
do
echo ----------$i 同步时间----------
ssh -t $i "sudo date -s $1 "
done
例如:
[lili@hadoop102 bin]$ dt.sh 16:00:00
----------hadoop102 同步时间----------
[sudo] password for lili:
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop102 closed.
----------hadoop103 同步时间----------
[sudo] password for lili:
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop103 closed.
----------hadoop104 同步时间----------
[sudo] password for lili:
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop104 closed.
|