将hbase安装目录下的jar包导入flume安装目录下的lib文件夹中
编写脚本文件:
a1.sources=s1 a1.channels=c1 c2 a1.sinks=sk1 sk2
a1.sources.s1.type=org.keedio.flume.source.SQLSource a1.sources.s1.hibernate.connection.url=jdbc:mysql://master:3306/flumedemo a1.sources.s1.hibernate.connection.user=root a1.sources.s1.hibernate.connection.password=1234 a1.sources.s1.table=case_data1 a1.sources.s1.columns.to.select=id,dt,ip a1.sources.s1.status.file.name=status_data a1.sources.s1.start.from=0 a1.sources.s1.hibernate.connection.driver_class=com.mysql.cj.jdbc.Driver a1.sources.s1.hibernate.connection.autocommit = true
a1.sources.selector.type=replicating
a1.channels.c1.type=file #checkpointDir设置检查点目录 a1.channels.c1.checkpointDir=/opt/flume_Method/flumeDemo/checkpoint #dataDirs存储Event的磁盘存储路径 a1.channels.c1.dataDirs=/opt/flume_Method/flumeDemo/dataDir
a1.channels.c2.type=file #checkpointDir设置检查点目录 a1.channels.c2.checkpointDir=/opt/flume_Method/flumeDemo/checkpoint1 #dataDirs存储Event的磁盘存储路径 a1.channels.c2.dataDirs=/opt/flume_Method/flumeDemo/dataDir1
a1.sinks.sk1.type=hbase2 a1.sinks.sk1.table=flume a1.sinks.sk1.columnFamily=demo a1.sinks.sk1.serializer=org.apache.flume.sink.hbase2.RegexHBase2EventSerializer a1.sinks.sk1.serializer.colNames=id,dt,ip a1.sinks.sk1.serializer.regex=^\"(.*?)\",\"(.*?)\",\"(.*?)\"$
a1.sinks.sk2.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.sk2.kafka.bootstrap.servers=master:9092 slave1:9092 slave2:9092 a1.sinks.sk2.kafka.topic=logs
a1.sources.s1.channels = c1 c2 a1.sinks.sk1.channel=c1 a1.sinks.sk2.channel=c2
创建HBase表flume,列簇demo
运行脚本文件
扫描全表:scan 'flume'
|