前提概要:之前我们已经实现了动态分流,即通过TableProcessFunction1类把维度数据和事实数据进行了分流处理,接下来就是把数据写入Hbase表和Kafka主题表中:
hbaseDS.addSink(new DimSink());
kafkaDS.addSink(kafkaSink);
此时的动态分流后的2种数据类型大致为:
在代码注释种我已经详尽地介绍了输出数据的情况和代码逻辑,接下来我会以代码编写逻辑思路为主要讲解路线,可以参考代码逻辑来看;
维度表和事实表动态写入
Hbase动态写入
怎么去编写这个DimSink类呢?首先我们需要通过Phoenix来与Hbase建立联系,之前我们已经通过循环遍历配置表来完成Hbase对应的建表操作了,所以此时我们已经可以通过Phoenix的Sql语法来完成数据插入操作了; 所以这里我们应该先获取到要插入的表名,然后整理成对应的Sql语句,具体句法类似upsert into 表空间.表名(列名…) values (值…); 但有一点要注意一下:
if(jsonObj.getString("type").equals("update")){
DimUtil.deleteCached(tableName,dataJsonObj.getString("id"));
}
其他代码内容主要就是Phoenix的链接、Sql语句的生成以及upsert语句编写;
Kafka动态写入
回到之前我们写过的MyKafkaUtil的工具类中,当时我们的Producer对象是这么生成的:
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(KAFKA_SERVER, topic, new SimpleStringSchema());
}
这段代码的topic对象在方法的参数中就已经确定了,但是我们在这里的kafka主题表不止一个,如果要把所有的kafka主题数据都写入进去,我们只能根据配置表中对应的kafka主题,有多少个主题就生成多少个Producer对象,但即使是这样,如果业务中有了新的事实数据类型,需要一个新的kafka主题,那我们这里还必须停止集群,重新添加新的Producer对象生成代码,这明显是不合理的;所以,这里我们采用另外一种创建方式:
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,15*60*1000+"");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
生成的FlinkKafkaProducer对象可以动态的选择kafka主题名,所以接下来我们拉看看这个Prodecer对象是如何实现动态写入kafka主题数据的:
FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(
new KafkaSerializationSchema<JSONObject>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
String sinkTopic = jsonObj.getString("sink_table");
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
return new ProducerRecord<>(sinkTopic,dataJsonObj.toString().getBytes());
}
}
);
到此为止,分流数据保存到Hbase和Kafka中的操作就全部完成了;
总结:
1 接收 Kafka 数据,转换过后再过滤空值数据
对 Maxwell 抓取数据进行 ETL,有用的部分保留,没用的过滤掉
2 实现动态分流功能(重难点)
由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是这里用Redis的话会对内存要求非常高,所以这里采用HBase来存储维度数据;
但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有两个方案实现:
? 一种是用Zookeeper存储,通过Watch感知数据变化。
? 另一种是用mysql数据库存储,周期性的同步。
这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。
--维度数据 发送到HBase
--事实数据 发送到kafka的dwd层
3.配置phoenix和hbase的相关内容,创建对应的命名空间;(具体见PDF文件)
4 把分好的流保存到对应表、主题中
业务数据保存到 Kafka 的主题中
维度数据保存到 Hbase 的表中
开启zk.sh、kk.sh、maxwell.sh、当前应用程序;再去开启业务数据jar包:
/opt/module/rt_dblog $java -jar gmall2020-mock-db-2020-11-27.jar
流程图总结如下所示:(要注意,这里具体维度数据和事实数据怎么用还没有实现,只是做了一个简单的表测试) 
|