今天分享又来了呀。?( ′・?・` ) 一起学习进步?ゝ??)ノ?
摘要:保证能够使用flink-cdc的大部分场景
完整教程内容:
介绍使用flink-cdc的前置知识,MySQL的binlog
展示部分flink-cdc源码
实践DataStream方式使用flink-cdc
实践FlinkSQL方式使用flink-cdc
对比总结DataStream、FlinkSQL方式的区别和适用场景
自定义反序列化器,使得获得的流数据更加直观易用
学习过程遇见过的flink-cdc相关报错
加油,好好学习,天天向上~?
Q:
1 flink-cdc的容错保证

可以看见这个任务提交上来了.

可以看见,数据应该打印在hadoop102 上面.
点击hadoop102这一行之后,可以看见输出:

当我在mysql中插入一条数据后:
可以看见成功监控到数据:

我们希望,当任务挂掉之后,重启任务能够接着上次消费到最新的数据,此时,我们应该保存一个savepoint,从savepoint这里来获取上次消费数据的地方:
[myself@hadoop102 ~]$ cd /opt/module/flink-standalone/
[myself@hadoop102 flink-standalone]$ bin/flink savepoint 713d0721e83be877c97f0a846cf6af85 hdfs://hadoop102:8020/flink1109/savepoint 命令操作 jobid hdfs地址
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Triggering savepoint for job 713d0721e83be877c97f0a846cf6af85.
Waiting for response...
Savepoint completed. Path: hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3
You can resume your program from this savepoint with the run command.
为了感受savepoint,我把这个flink任务给主动挂掉

之后在mysql中插入新的数据行:

从hdfs中获取这个文件夹的地址:

执行命令:
[myself@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink1109/savepoint/savepoint-713d07-9fd0e5ddd3f3 -c com.atguigu.Flink01_DataStream flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-standalone/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID f2efda4055ccd36731b2f64aef7e3c9c
如果不从savepoint重启,那还是重头开始消费。

成功接着上次消费数据的地方获取到了数据:

所以这样就实现了DataStream方式 断点续传 ? 只不过把消费数据的位置当做状态来保存的,然后从状态里恢复,不像flum 和 canal,是把消费数据的位置保存到文件中.
Q:
2?FlinkSQL方式的应用
2.2.0 首先到官网去看example
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH ( with后面加上连接参数
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
2.2.1 添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
2.2.2 代码实现
package com.alibaba;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author zhouyanjun
* @create 2021-06-22 12:31
*/
public class Flink02_SQL {
public static void main(String[] args) throws Exception {
//1.获取执行环境
//2.使用SQL方式读取MySQL变化数据
//3.转换为流打印 我们用流来看下,数据的格式
//4.启动
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.使用SQL方式读取MySQL变化数据
tableEnv.executeSql("create table trademark(id string,tm_name string,logo_url string) " +
"with(" +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'hadoop102', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'database-name' = 'gmall0820flink', " +
" 'table-name' = 'base_trademark'" +
")"
);
//3.转换为流打印 我们用流来看下,数据的格式
Table table = tableEnv.sqlQuery("select * from trademark");//会得到table对象.
tableEnv.toRetractStream(table, Row.class).print(); //把动态表转换成流.这个地方应该是什么流会比较合适呢?
//先用撤回流,
//4.启动
env.execute();
}
}
Connector Options 连接器选项
scan.startup.mode
optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionp for more detailed information.

输出结果:
D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=3755:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.atguigu.Flink02_SQL
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
(true,1,Redmi,null)
(true,2,苹果,/static/default.jpg)
(true,3,华为,/static/default.jpg)
(true,4,TCL,/static/default.jpg)
(true,5,小米,/static/default.jpg)
(true,6,长粒香,/static/default.jpg)
(true,7,金沙河,/static/default.jpg)
(true,8,索芙特,/static/default.jpg)
(true,9,CAREMiLLE,/static/default.jpg)
(true,10,欧莱雅,/static/default.jpg)
(true,11,香奈儿,/static/default.jpg)
(true,12,周大帅哥,null)
(true,13,天下第一帅,null)
六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
可以看见,上面就是默认的初始化方式。
先把数据读入进来,然后连接到最新的binlog的位置。

我们删除一条数据之后的输出:
(true,1,Redmi,null)
(true,2,苹果,/static/default.jpg)
(true,3,华为,/static/default.jpg)
(true,4,TCL,/static/default.jpg)
(true,5,小米,/static/default.jpg)
(true,6,长粒香,/static/default.jpg)
(true,7,金沙河,/static/default.jpg)
(true,8,索芙特,/static/default.jpg)
(true,9,CAREMiLLE,/static/default.jpg)
(true,10,欧莱雅,/static/default.jpg)
(true,11,香奈儿,/static/default.jpg)
(true,12,周大帅哥,null)
(true,13,天下第一帅,null)
六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
(false,13,天下第一帅,null) 这里就是表示撤回
我们更新一下信息:

(true,1,Redmi,null)
(true,2,苹果,/static/default.jpg)
(true,3,华为,/static/default.jpg)
(true,4,TCL,/static/default.jpg)
(true,5,小米,/static/default.jpg)
(true,6,长粒香,/static/default.jpg)
(true,7,金沙河,/static/default.jpg)
(true,8,索芙特,/static/default.jpg)
(true,9,CAREMiLLE,/static/default.jpg)
(true,10,欧莱雅,/static/default.jpg)
(true,11,香奈儿,/static/default.jpg)
(true,12,周大帅哥,null)
(true,13,天下第一帅,null)
六月 22, 2021 1:00:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000007/154 (sid:5811, cid:11)
(false,13,天下第一帅,null)
(false,12,周大帅哥,null) 更新就会产生两条数据。删除旧数据,增添新数据。
(true,12,周大帅哥,更新一条信息)
Q:
3 两种方式的区别:
那么flink 的DataStream方式和FlinkSQL方式的区别在于:
DataStream可以监控多库多表
FlinkSQL只能监控单库的单表, 这是因为代码中建表语句的限制。优点:格式更友好点。
可以通过代码的对比:
DataStream:库的列表、表的列表
.databaseList("gmall0820flink")
.tableList("gmall0820flink.base_trademark")
FlinkSQL: 库名 表名
" 'database-name' = 'gmall0820flink', " +
" 'table-name' = 'base_trademark'" +

这样也是可以的,同时监控两个库,3张表(要首先在mysql的binlog里把库给配置上,开启)。
Q:
4?自定义反序列化器
修改反序列化,使得获得的流数据更加直观易用。
.deserializer(new StringDebeziumDeserializationSchema())
4.1 代码实现
public class Flink03_DataStreamWithMySchema {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "atguigu");
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启Ck
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(5000L); //设置checkpoint超时时间,超过时间则会被丢弃
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/ck"));
//2.使用CDC的方式读取MySQL变化数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource
.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall0820flink")
.tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
.startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
// .startupOptions(StartupOptions.earliest())
// .startupOptions(StartupOptions.latest())
// .startupOptions(StartupOptions.specificOffset())
// .startupOptions(StartupOptions.timestamp())
.deserializer(new MyDebeziumDeserializationSchema())//首先对这里进行修改。自定义反序列化器。把序列化的二进制文件给反序列化
.build();
DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
//3.打印
StreamSource.print();
//4.启动
env.execute();
}
}
源码查看:
.deserializer(new StringDebeziumDeserializationSchema())
点进去
/**
* The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
*/
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}
继续点进去,发现是一个接口,所以在我们写代码时,就要实现这个接口
@PublicEvolving
public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
void deserialize(SourceRecord var1, Collector<T> var2) throws Exception;
DebeziumDeserializationSchema<T>中的<T>是传递给Collector<T> 使用,而我们已知Collector<T>是往下游输出数据用的。
所以这个T就是输出的数据类型。
}
当我们不知道怎么写的时候,参考源码中别人是怎么写的
.deserializer(new StringDebeziumDeserializationSchema()) 点击进去。
可以看见如下:
public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = -3168848963265670603L;
public StringDebeziumDeserializationSchema() {
}
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
out.collect(record.toString()); 因为是.toString()的写法,所以导致我们默认情况下输出的数据,不能直接的进行使用。非常不直观
}
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO; 所以这里我们可以参考,同样使用这种写法
}
}
拿默认反序列化得到的数据:是一个对象不是json,不然我json转json就没意思了
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000004, pos=154, row=1, snapshot=true}}
ConnectRecord{topic='mysql_binlog_source.gmall0820flink.base_trademark', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=1,tm_name=Redmi},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall0820flink,table=base_trademark,server_id=0,file=mysql-bin.000004,pos=154,row=0},op=c,ts_ms=1623943673564}, valueSchema=Schema{mysql_binlog_source.gmall0820flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
我们点进去,找到最后的toString(),可以发现:
public String toString() {
return "SourceRecord{sourcePartition=" + this.sourcePartition + ", sourceOffset=" + this.sourceOffset + "} " + super.toString();
}
发现是写死的字符串。

我们对照着默认监控获取到的数据来写代码,为了获取库名.表名,我们选择.keySchema()
Schema schema = sourceRecord.keySchema();
然后看看schema 能够获取什么东西。然后发现schema并不容易获取数据。



选择这个kafak的struct。老师是试过之后,才选择使用这个struct
想要一次性获取所有的字段名,这样方便通过字段名来获取对应的字段值 。
所以就可以对schema()元数据信息的属性值做一次遍历。
//获取操作类型 老师在这里也是卡主了,不知道怎么写,然后在网上搜索贴子后才推出写法的。有一个专门的解析类。
公司当中是一定会有新东西的。
最终代码:
package com.alibaba;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
* @author zhouyanjun
* @create 2021-06-22 17:03
*/
public class Flink03_DataStreamWithMySchema {
public static void main(String[] args) throws Exception {
// System.setProperty("HADOOP_USER_NAME", "alibaba" );
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启Ck,这块我不设置了,先在本地测试运行下
//2.使用CDC的方式读取MySQL变化数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource
.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall0820flink")
.tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
.startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
// .startupOptions(StartupOptions.earliest())
// .startupOptions(StartupOptions.latest())
// .startupOptions(StartupOptions.specificOffset())
// .startupOptions(StartupOptions.timestamp())
.deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。
.build();
DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
//3.打印
StreamSource.print();
//4.启动
env.execute();
}
public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
/**
* 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
* {
* "data":"{"id":11,"tm_name":"sasa"}", 首先是data,也就是数据本身,因为有多个字段,所以还是个json
* "db":"", 数据库名。因为我们要能够获取多库多表的数据
* "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
* "op":"c u d", 操作类型,来表示增加、更新(修改)、删除
* "ts":"" 我们需要有时间字段。
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据 输出的数据
//我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
//为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
// Schema schema = sourceRecord.keySchema();
//获取主题信息,提取数据库和表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
String db = fields[1]; //获取我们想要的库名、表名了
String tableName = fields[2];
//获取Value信息,提取数据本身
// Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after"); //获得after后面的struct结构
//遍历之前先new一个json.使用fastjson的包
JSONObject jsonObject = new JSONObject();
for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
Object o = after.get(field);
jsonObject.put(field.name(),o);
}
//想要获得op,op属于source属性所对应的值里
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//最后要封装为一个大的json发送出去
//创建结果JSON
JSONObject result = new JSONObject();
result.put("database", db);
result.put("tableName", tableName);
result.put("data", jsonObject);
result.put("op", operation);
//输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
}
最终代码:
D:\Develop_software\java\jdk1.8.0_141\bin\java.exe "-javaagent:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\lib\idea_rt.jar=11101:D:\Develop_software\IntelliJ_IDEA 2019.2.2_song\IntelliJ IDEA 2019.2.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Develop_software\java\jdk1.8.0_141\jre\lib\charsets.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\deploy.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\access-bridge-64.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\cldrdata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\dnsns.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jaccess.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\jfxrt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\localedata.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\nashorn.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunec.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunjce_provider.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunmscapi.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\sunpkcs11.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\ext\zipfs.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\javaws.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jce.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfr.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jfxswt.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\jsse.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\management-agent.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\plugin.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\resources.jar;D:\Develop_software\java\jdk1.8.0_141\jre\lib\rt.jar;D:\workspace_idea1\BigData0621\gmall-flink-201109\flink-cdc\target\classes;D:\MyWork\Program\RepMaven\org\apache\flink\flink-java\1.12.0\flink-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-core\1.12.0\flink-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-annotations\1.12.0\flink-annotations-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-metrics-core\1.12.0\flink-metrics-core-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-asm-7\7.1-12.0\flink-shaded-asm-7-7.1-12.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\MyWork\Program\RepMaven\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\MyWork\Program\RepMaven\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\MyWork\Program\RepMaven\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\MyWork\Program\RepMaven\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\MyWork\Program\RepMaven\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\MyWork\Program\RepMaven\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\MyWork\Program\RepMaven\org\apache\flink\force-shading\1.12.0\force-shading-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-java_2.12\1.12.0\flink-streaming-java_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-file-sink-common\1.12.0\flink-file-sink-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-runtime_2.12\1.12.0\flink-runtime_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-queryable-state-client-java\1.12.0\flink-queryable-state-client-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-hadoop-fs\1.12.0\flink-hadoop-fs-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-io\commons-io\2.7\commons-io-2.7.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-netty\4.1.49.Final-12.0\flink-shaded-netty-4.1.49.Final-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-jackson\2.10.1-12.0\flink-shaded-jackson-2.10.1-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-12.0\flink-shaded-zookeeper-3-3.4.14-12.0.jar;D:\MyWork\Program\RepMaven\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\config\1.3.3\config-1.3.3.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;D:\MyWork\Program\RepMaven\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;D:\MyWork\Program\RepMaven\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;D:\MyWork\Program\RepMaven\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;D:\MyWork\Program\RepMaven\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;D:\MyWork\Program\RepMaven\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;D:\MyWork\Program\RepMaven\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\MyWork\Program\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-shaded-guava\18.0-12.0\flink-shaded-guava-18.0-12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-clients_2.12\1.12.0\flink-clients_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-optimizer_2.12\1.12.0\flink-optimizer_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\MyWork\Program\RepMaven\mysql\mysql-connector-java\5.1.49\mysql-connector-java-5.1.49.jar;D:\MyWork\Program\RepMaven\com\alibaba\ververica\flink-connector-mysql-cdc\1.2.0\flink-connector-mysql-cdc-1.2.0.jar;D:\MyWork\Program\RepMaven\com\alibaba\fastjson\1.2.75\fastjson-1.2.75.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-common\1.12.0\flink-table-common-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-files\1.12.0\flink-connector-files-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-connector-base\1.12.0\flink-connector-base-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java\1.12.0\flink-table-api-java-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala_2.12\1.12.0\flink-table-api-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-reflect\2.12.7\scala-reflect-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\scala-compiler\2.12.7\scala-compiler-2.12.7.jar;D:\MyWork\Program\RepMaven\org\scala-lang\modules\scala-xml_2.12\1.0.6\scala-xml_2.12-1.0.6.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-java-bridge_2.12\1.12.0\flink-table-api-java-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-api-scala-bridge_2.12\1.12.0\flink-table-api-scala-bridge_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-scala_2.12\1.12.0\flink-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-streaming-scala_2.12\1.12.0\flink-streaming-scala_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\apache\flink\flink-table-runtime-blink_2.12\1.12.0\flink-table-runtime-blink_2.12-1.12.0.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\MyWork\Program\RepMaven\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\MyWork\Program\RepMaven\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\MyWork\Program\RepMaven\org\reflections\reflections\0.9.10\reflections-0.9.10.jar com.alibaba.Flink03_DataStreamWithMySchema
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"Redmi","id":1},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"苹果","logo_url":"/static/default.jpg","id":2},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"华为","logo_url":"/static/default.jpg","id":3},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"TCL","logo_url":"/static/default.jpg","id":4},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"小米","logo_url":"/static/default.jpg","id":5},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"长粒香","logo_url":"/static/default.jpg","id":6},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"金沙河","logo_url":"/static/default.jpg","id":7},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"索芙特","logo_url":"/static/default.jpg","id":8},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"CAREMiLLE","logo_url":"/static/default.jpg","id":9},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"欧莱雅","logo_url":"/static/default.jpg","id":10},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"香奈儿","logo_url":"/static/default.jpg","id":11},"tableName":"base_trademark"}
{"op":"CREATE","database":"gmall0820flink","data":{"tm_name":"周大帅哥","logo_url":"更新一条信息","id":12},"tableName":"base_trademark"}
六月 22, 2021 9:55:39 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop102:3306 at mysql-bin.000009/154 (sid:5499, cid:13)

放入解析json的网站中,我们可以看见成功进行了解析。这样就方便我们后续对数据的处理。成功把输出为json字符串。
自定义反序列化器,既能够实现监控多库多表,又能够输出方便我们后续处理的数据格式。
Q:
5.在运行过程中我又报错了:
这个是错误提示:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
at org.apache.kafka.connect.data.Struct.get(Struct.java:86)
at com.alibaba.Flink03_DataStreamWithMySchema$MyDebeziumDeserializationSchema.deserialize(Flink03_DataStreamWithMySchema.java:93)
at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:114)
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:82)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我根据这个定位找到了代码:
for (Field field : value.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
Object o = after.get(field); 根据提示,我代码点击到这里了。但其实代码真正写错的地方在上面一行
jsonObject.put(field.name(),o);
}
正确的代码是:
for (Field field : after.schema().fields()) { 正确的代码写法。
Object o = after.get(field);
jsonObject.put(field.name(),o);
}
所以,debug的简单方法就是在提示上下行代码处看看,看看有没有逻辑上的漏洞。
打包上传集群后,我每次都从savepoint恢复的,为什么不从checkpoint恢复?
其实可以的,因为savepoint和checkpoint里面的东西都是一样的。
在任务被正常Cancel的时候不保留CK。假如是任务出错的话,checkpoint的文件夹还是会保留的。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); 默认是这个参数。也就是cancel任务之后,默认会把checkpoint清除。
假如我们正常来升级代码,就会手动cancel掉任务,那此时checkpoint的文件夹就没了。
在任务被正常Cancel的时候 保留CK 。假如是任务出错的话,checkpoint的文件夹还是会保留的。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 保留
我们点进RETAIN_ON_CANCELLATION后发现,
RETAIN_ON_CANCELLATION(false);
不加这行参数有什么现象呢?
cancel掉任务之后,

对比这上下两个图片:

cancel之后,文件夹没了,也就不能从进行数据恢复了。
所以最终的代码:
package com.alibaba;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
* @author zhouyanjun
* @create 2021-06-22 17:03
*/
public class Flink03_DataStreamWithMySchema {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "alibaba" );
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启Ck
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(5000L);
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink1109/checkpoint"));
//在任务被正常Cancel的时候不保留CK
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
//在任务被正常Cancel的时候 保留CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.使用CDC的方式读取MySQL变化数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource
.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall0820flink")
.tableList("gmall0820flink.base_trademark")///可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据。注意写表名的时候,要把库名加上,因为flinkcdc可以监控多个库
.startupOptions(StartupOptions.initial())//启动的选项/模式。这是个枚举类。共有5种。
// .startupOptions(StartupOptions.earliest())
// .startupOptions(StartupOptions.latest())
// .startupOptions(StartupOptions.specificOffset())
// .startupOptions(StartupOptions.timestamp())
.deserializer(new MyDebeziumDeserializationSchema())//自定义反序列化器。把序列化的二进制文件给反序列化
.build();
DataStreamSource<String> StreamSource = env.addSource(sourceFunction);//得到了一个流
//3.打印
StreamSource.print();
//4.启动
env.execute();
}
public static class MyDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {//我们要把数据封装成json格式,往下游传递进行处理,是json所以我们设置<T>类型是String
/**
* 下面是我们要定义的json格式:起码要有这些信息,我们后续才方便来处理这条数据
* {
* "data":"{"id":11,"tm_name":"sasa"}", 首先是data,也就是数据本身,因为有多个字段,所以还是个json
* "db":"", 数据库名。因为我们要能够获取多库多表的数据
* "tableName":"", 有了数据库,对应的就要有表名。我后续是对库名.表名的数据进行分流处理。因为不同库的不同表的字段是不一样的,如果放到一个流里进行处理,代码里没法确定具体的类型,只能是object类型。
* "op":"c u d", 操作类型,来表示增加、更新(修改)、删除
* "ts":"" 我们需要有时间字段。
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //输入的数据 输出的数据
//我们就是要把SourceRecord sourceRecord 这部分内容解析成json。
//为了后续能够正常使用数据,我们首先要能正确提取字段出来,也就是说,这个 json是什么样子,我自己要定义下。后续处理数据,我们要什么,就来定义什么
// Schema schema = sourceRecord.keySchema();
//1 获取主题信息,提取数据库和表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\." );//java中.不能乱用,要加转义符 有点疑问。
String db = fields[1]; //获取我们想要的库名、表名了
String tableName = fields[2];
//2 获取Value信息,提取数据本身
// Object value = sourceRecord.value(); 可以看见是object类型,但根据初始化获得的数据,是stuct类型
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after"); //获得after后面的struct结构
//遍历之前先new一个json.使用fastjson的包
JSONObject jsonObject = new JSONObject();
for (Field field : after.schema().fields()) { //并不清楚具体有多少属性值,选择获取元数据信息,然后进行遍历
Object o = after.get(field);
jsonObject.put(field.name(),o);
}
//想要获得op,op属于source属性所对应的值里
//3 获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//最后要封装为一个大的json发送出去
//创建结果JSON
JSONObject result = new JSONObject();
result.put("database", db);
result.put("tableName", tableName);
result.put("data", jsonObject);
result.put("op", operation);
//输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
}
flink-cdc系列完结。
-?END -
本文为原创文章

作者:Eugene
某上市公司数据岗萌新,希望自己以后不会是萌新 哈哈
?:在这里跟我一起学习技术、职场、人生、原理、健身、摄影、生活等知识吧!
?:?欢迎点个关注一起学习,进步充实人生。