| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink随笔 -> 正文阅读 |
|
[大数据]Flink随笔 |
一、Flink简介1.1 初识Flink1.1.1 序言
1.1.2 计算引擎的发展史
1.2 Flink的重要特点1.2.1 事件驱动型(Event-driven)
1.2.2 流与批的世界观
1.2.3 分层API
1.3 Spark or Flink
1.3.1 两者的区别总结:
1.3.2 常见问题:
二、Flink快速上手
2.1 创建maven项目
2.2 批处理WordCount准备数据:
代码示例:
2.3 流处理WordCount
2.3.1 有界流第一种方式:原生方式
第二种方式:Lambda表达式
2.3.2 无界流
三、Flink部署3.1 开发模式
3.2 local-cluster模式
3.2.1 local-cluster模式配置local-cluster模式基本属于零配置。
3.2.2 在local-cluster模式下运行无界的WordCount
3.3 Standalone模式
3.3.1 Standalone模式配置
3.3.2 Standalone模式运行无界流WorkCount
3.3.3 Standalone高可用(HA)
3.4 Yarn模式
3.4.1 Yarn模式配置
3.4.2 Yarn运行无界流WordCount
3.4.3 Flink on Yarn的3种部署模式
3.4.3.1 Session-Cluster
3.4.3.2 Per-Job-Cluster
3.4.3.3 Application ModeApplication Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群. 3.4.4 Per-Job-Cluster模式执行无界流WordCount
提交任务到Yarn的其他队列
3.4.5 Session-Cluster模式执行无界流WordCount
3.4.6 Application Mode模式执行无界流WordCount? 3.4.7 Yarn模式高可用
**注意: **配置完不要忘记分发, 和重启yarn
注意: yarn-site.xml中是它活的次数的上限, flink-conf.xml中的次数应该小于这个值。 3.5 Scala REPLscala 交互环境。
3.6 K8S & Mesos模式
3.7 Windows模式
Windows系统搭建Flink集群具体如下:
四、Flink运行架构4.1 运行架构https://ci.apache.org/projects/flink/flink-docs-release-1.11/fig/processes.svg(图片来源地址) 4.1.1
|
框架 | Spark | Flink |
---|---|---|
区 | Driver | JobManager |
别 | Executor | TaskManager |
点 | core | slots |
这个进程包含3个不同的组件 : Dispatcher、ResourceManager、JobManager
负责接收用户提供的作业,并且负责为这个新提交的作业启动一个新的JobManager 组件
- 负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager。(管理多个JobManager)
- 注意这个ResourceManager不是Yarn中的ResourceManager, 是Flink中内置的, 只是赶巧重名了而已。
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
负责管理作业的执行,在一个Flink集群中可能有多个作业同时执行,每个作业都有自己的JobManager组件。
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
JobMaster负责管理单个JobGraph的执行。多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster,是JobManager的物理单元。
Flink中的工作进程(JobManager)。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
集群管理器,比如Standalone、YARN、K8s等,就是前面我们学习的不同环境。
提交Job的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交Job后,Client可以结束进程(Streaming的任务),也可以不结束并等待结果返回。
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)。
这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的(空转),但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到Flink可以将这个空闲的Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。
并行度设置的优先级:
算子(代码) > 全局(代码) > 提交参数 > 配置文件
Parallelism 和 Slots之间的关系 :
- slots表示此任务可以承担的最大的线程数量,是静态的。
- prallelism表示任务运行时实际开启的线程数量,是动态的。(Parallelism <= slots)
- 并行度的计算推理原理:
- Flink的并行度可以对算子进行设置,那么算子的子任务的数量,就是算子的并行度。
- Job的并行度 => 并行度最大的算子的并行度。
- 一个Job需要多少并行度 ? 取决于 Job 的并行度。
在学习Spark RDD时,无论是读取内存中的数据,或读取文件数据,都会接触一个叫并行度的概念,并且在RDD的算子中也可以动态改变并行度,通过学习,咱们应该知道Spark中的并行度最终体现为分区,而分区又意味着Task。所以Spark 计算中Task的数量是可以通过并行度推算出来的。这个大家没有的问题的话,那就好办了,为什么?因为Flink的并行度的作用和Spark中并行度的作用的一样的。最后都可以表现为任务的并行执行。
虽然Spark中的并行度和Flink的并行度的原理,作用基本一致,但是由于模型选择的问题,所以使用上依然有些细微的区别:
? Spark的并行度设置后需要调用特殊的算子(repartition)或特殊的操作(shuffle)才能进行改变,比如调用 flatMap算子后再调用repartition改变分区。
? Flink的并行度设置可以在任何算子后使用,并且为了方便,也可以设置全局并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
但是需要注意,某些数据源数据的采集是无法改变并行度,如Socket
? 如果Flink的一个算子的并行度为2,那么这个算子在执行时,这个算子对应的task就会拆分成2个subtask,发到不同的Slot中执行。
一个特定算子的子任务(subtask)的个数被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。并行度是单指一个任务中的,而不是整个程序的。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。
类似于spark中的窄依赖
stream(比如在source和flatMap operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。
类似于spark中的宽依赖
stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。
例如,keyBy()基于**hashCode重分区(类似于Spark中Shuffle)、broadcast(广播)和rebalance(重分配)**会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。
算子的一个并行子任务,叫做subtask。
**task :**是由①不同算子的subtask(子任务)②根据一定的规则③合并在一起形成。
不同算子组成一个 task 的条件:详情请查看下一章节 [4.3.5 Operator Chains(任务链)](# 4.3.5 Operator Chains(任务链))
①:并行度一致
②:两个算子之间是one-to-one的关系
Flink执行时,由于并行度的设置,可以将同一个Job不同算子的subtask(子任务)放在同一块内存中进行处理,那么这样在执行时就可以合并成一个完整的task进行处理,而不是独立的子任务,这样就减少了子任务(SubTask)之间调度和数据传递的性能损耗,避免了跨节点进行数据传输。
把相同并行度one to one操作,Flink将这样相连的算子链接在一起形成一个 task ,原来的算子成为里面的一部分。 每个task被一个线程执行.
将算子链接成task是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。
阻止任务链的常用方法:
使用下面的方法后,哪怕两个算子之间是one to one的关系,并且并行度一致,也不会形成任务链。
执行环境对象:
disableOperatorChaining()方法
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining();
算子:
startNewChain()方法
disableChaining()方法
算子.startNewChain(); 算子.disableChaining();
由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> Physical Graph。
2个并发度(Source为1个并发度)的 SocketTextStreamWordCount 四层执行图的演变过程 env.socketTextStream().flatMap(…).keyBy(0).sum(1).print();
是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
StreamGraph经过优化后生成了 JobGraph,是提交给 JobManager 的数据结构。主要的优化为: 将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
下面的两张图比较笼统,而且yarn-cluster模式不准确,完整准确的提交流程请采用[4.1.2
中文图解:
](# 4.1.2中文图解:
)结合下方执行流程说明作详细系统学习。
我们来看看当一个应用提交执行时,Flink的各个组件是如何交互协作的:
执行流程说明:
和其他所有的计算框架一样,Flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分
Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
以下的两种API都可以自动区分部署环境
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源**(Source)**。
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
/**
* 水位传感器:用于接收水位数据
*
* id:传感器编号
* ts:时间戳
* vc:水位
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。
public class Flink05_Source_Collection {
public static void main(String[] args) throws Exception {
List<WaterSensor> waterSensors = Arrays.asList(
new WaterSensor("ws_001", 1577844001L, 45),
new WaterSensor("ws_002", 1577844015L, 43),
new WaterSensor("ws_003", 1577844020L, 42));
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromCollection(waterSensors)
.print();
env.execute();
}
}
public class Flink05_Source_File {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.readTextFile("input")
.print();
env.execute();
}
}
说明:
参数可以是目录也可以是文件
路径可以是相对路径也可以是绝对路径
相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
也可以从hdfs目录下读取, 使用路径:hdfs://…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> <scope>provided</scope> </dependency>
[参考第二章无界流读取](# 2.3.2 无界流)
添加相应的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
参考代码
public class Flink05_Source_Kafka {
public static void main(String[] args) throws Exception {
// 0.Kafka相关配置
Properties conf = new Properties();
// conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
conf.setProperty("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
conf.setProperty("group.id", "Flink01_Source_Kafka");
conf.setProperty("auto.offset.reset", "latest");
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 添加Kafka的数据源Source
/*
参数1:kafka中的主题
参数2:序列化规则
参数3:kafka的相关配置信息
*/
DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer<> ("sensor", new SimpleStringSchema(), conf));
// 打印
kafkaSource.print("kafka source");
env.execute();
}
}
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式。
示例1:
public class Flink05_Source_Custom {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(new MySource("hadoop102", 9999))
.print();
env.execute();
}
// 自定义数据源,实现SourceFunction接口,泛型即是数据的类型
/*
重写接口中的两个方法 :
①:run()方法就是程序启动的方法
②:cancel()方法就是程序结束的方法
*/
public static class MySource implements SourceFunction<WaterSensor> {
private String host;
private int port;
private volatile boolean isRunning = true;
private Socket socket;
public MySource(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void run(SourceContext<WaterSensor> ctx) throws Exception {
// 实现一个从socket读取数据的source
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
String line = null;
while (isRunning && (line = reader.readLine()) != null) {
String[] split = line.split(",");
ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
}
}
/**
* 大多数的source在run方法内部都会有一个while循环,
* 当调用这个方法的时候, 应该可以让run方法中的while循环结束
*/
@Override
public void cancel() {
isRunning = false;
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
*/
示例2:
public class flink05_source_consum {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 获取数据源
DataStreamSource<WaterSensor> ConsumSource = env.addSource(new MySource());
// 3. 算子操作
// 4. 输出、保存...
ConsumSource.print();
// 5. 启动执行
env.execute();
}
private static class MySource implements SourceFunction<WaterSensor> {
private boolean flag = true;
Random random = new Random();
// 启动数据源
@Override
public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
while(flag){
// 准备数据源
WaterSensor waterSensor = new WaterSensor(
"sensor_" + random.nextInt(10),
System.currentTimeMillis()+ 492,
random.nextInt(100));
Thread.sleep(1000);
// 向下游发送数据
sourceContext.collect(waterSensor);
}
}
// 释放数据源
@Override
public void cancel() {
this.flag = false;
}
}
}
转换算子可以把一个或多个DataStream转成一个新的DataStream。程序可以把多个复杂的转换组合成复杂的数据流拓扑。
类似于Spark中的转换算子。
如果算子前后是一对一关系,则算子中没有收集器,比如map算子
如果算子前后是一对多关系,则算子中有收集器,比如flatMap算子
**无状态算子 : ** 数据的输入和数据的输出是一致的,对数据没有聚合作用。
map
flatMap
filter
数据传输算子:
**多流转换算子:**涉及到对多条流的合并、拆分等操作。
作用
将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
适用场景:把符合规范的数据转换为javaBean实体类
参数
lambda表达式或MapFunction实现类
返回
DataStream → DataStream(可以转换数据的类型)
示例
得到一个新的数据流: 新的流的元素是原来流的元素的平方
package com.heather.flink.java.chapter_5.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink01_TransForm_Map_Anonymous {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(1, 2, 3, 4, 5)
// MapFunction()方法的参数:
// 参数1:数据的输入的类型
// 参数2:数据的输出的类型
.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * value;
}
})
.print();
env.execute();
}
}
env
.fromElements(1, 2, 3, 4, 5)
.map(ele -> ele * ele)
.print();
package com.heather.flink.java.chapter_5.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink01_TransForm_Map_StaticClass {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(1, 2, 3, 4, 5)
.map(new MyMapFunction())
.print();
env.execute();
}
public static class MyMapFunction implements MapFunction<Integer, Integer> {
@Override
public Integer map(Integer value) throws Exception {
return value * value;
}
}
}
Rich…Function类
所有Flink函数类都有其Rich版本。它与常规函数的不同在于提供了更多的,更丰富的功能:
- 可以获取运行环境的上下文(主要用于获取数据的状态)
- 提供了获取连接的生命周期方法(open()和close(),各执行一次)
- open()方法:执行的次数和并行度成**“正相关”**。
- close()方法:执行的次数和并行度成**“正相关”,有界流会是并行度 * 2**。
例如:
RichMapFunction
package com.heather.flink.java.chapter_5.transform;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink01_TransForm_Map_RichMapFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
env
.fromElements(1, 2, 3, 4, 5)
.map(new MyRichMapFunction()).setParallelism(2)
.print();
env.execute();
}
public static class MyRichMapFunction extends RichMapFunction<Integer, Integer> {
// 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open ... 执行一次");
}
// 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次
@Override
public void close() throws Exception {
System.out.println("close ... 执行一次");
}
@Override
public Integer map(Integer value) throws Exception {
System.out.println("map ... 一个元素执行一次");
return value * value;
}
}
}
说明:
- 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次, 而且先被调用。
- 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用。
- getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态. 开发人员在需要的时候自行调用获取运行时上下文对象。
作用
消费一个元素并产生零个或多个元素
参数
参数1:Integer value : 输入的数据参数2:Collector out : 输出的数据
返回
DataStream → DataStream(可以转换数据的类型)
示例 : 如下 ↓
// 新的流存储每个元素的平方和3次方
env
.fromElements(1, 2, 3, 4, 5)
// FlatMapFunction()方法的参数:
// 参数1:数据的输入的类型
// 参数2:数据的输出的类型
.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
out.collect(value * value);
out.collect(value * value * value);
}
})
.print();
env
.fromElements(1, 2, 3, 4, 5)
.flatMap((Integer value, Collector<Integer> out) -> {
out.collect(value * value);
out.collect(value * value * value);
}).returns(Types.INT)
.print();
说明: 在使用Lambda表达式表达式的时候, 由于泛型擦除的存在, 在运行的时候无法获取泛型的具体类型, 全部当做Object来处理, 及其低效, 所以Flink要求当参数中有泛型的时候, 必须明确指定泛型的类型.
作用
根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
参数
FlatMapFunction实现类
返回
DataStream → DataStream(不可以转换数据的类型,仅仅是过滤的作用)
示例 : 如下 ↓
// 保留偶数, 舍弃奇数
env
.fromElements(10, 3, 5, 9, 20, 8)
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
})
.print();
env
.fromElements(10, 3, 5, 9, 20, 8)
.filter(value -> value % 2 == 0)
.print();
作用
(键控流)把流中的数据分到不同的分区中,具有相同key的元素会分到同一个分区中【任务槽】(数据倾斜的根源),不同的key也有可能在同一个分区【任务槽】中,但是逻辑上是隔离开的,即 给每个数据打上标签(属于哪个分组),并不是对并行度进行改变。
通过keyBy之后得到keyedStream流,keyedStream流就可以进行各种各样的聚合算子进行操作。
[5.3.10 简单滚动聚合算子](# 5.3.10 简单滚动聚合算子)
分区规则:在内部是使用的hash分区来实现的,根据key的hash值对key进行求模操作。
难点:
【重点】 【重点】 【重点】
KeyedStream的参数说明:
Type parameters:
– The type of the elements in the Keyed Stream. (返回值得类型)
– The type of the key in the Keyed Stream.(分组的key的类型)KeyedStream<Tuple2<String, Integer>, Tuple> wordToGroupDS = wordAndOneDS.keyBy(0); KeyedStream<WaterSensor, Tuple> sensorKSByFieldName = sensorDS.keyBy("id");
通过上述的两个代码可知,无论是通过 位置索引 还是 字段名称 进行分组,返回的key的类型都是Tuple(因为无法确定),这就导致后续使用key的时候还需要进行类型的转换等。
**所以:**通过 明确的指定 key 的方式, 获取到的 key就是具体的类型 => 实现 KeySelector 或 lambda
参数
Key选择器函数: interface KeySelector<IN, KEY>? 注意: 什么值不可以作为KeySelector的Key:
- 没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
- 任何类型的数组
返回
DataStream → KeyedStream
示例:
明确的指定 key 的方式【对象.get属性()】
: 如下 ↓
public class Flink02_Transform_KeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
// 2. 操作算子
// 2.1 map()算子把数据转换为对象
SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 2.2 分组(下面的三种方式请观察返回值类型中的泛型)
// 2.2.1 通过指定下标索引的方式【不推荐使用】
// KeyedStream<WaterSensor, Tuple> keyByDS = mapDS.keyBy(0);
// 2.2.2 通过指定 属性 的方式【不推荐使用】
// KeyedStream<WaterSensor, Tuple> keyByDS = mapDS.keyBy("id");
// 2.2.3 明确的指定 key 的 方式 【推荐使用】
KeyedStream<WaterSensor, String> keybyDS = mapDS.keyBy(new MyKeyBySeletor());
// 3. 输出、保存 ...
keybyDS.print();
env.execute();
}
private static class MyKeyBySeletor implements KeySelector<WaterSensor,String> {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
}
}
public class Flink02_Transform_KeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
// 2. 操作算子
// 2.1 map()算子把数据转换为对象
SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 2.2 分组【lambda表达式方法】
KeyedStream<WaterSensor, String> keybyDS = mapDS.keyBy( r -> r.getId() );
// 3. 输出、保存 ...
keybyDS.print();
env.execute();
}
}
作用
把流中的元素随机打乱,然后随机(底层是分区器)发送到下游,对同一个组数据, 每次只需得到的结果都不同。
参数
无
返回
DataStream → DataStream
示例 : 如下 ↓
env
.fromElements(10, 3, 5, 9, 20, 8)
.shuffle()
.print();
env.execute();
已经过时, 在1.12中已经被移除
作用
某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出。
split用于给流中的每个元素添加标记. select用于根据标记取出对应的元素, 组成新的流。在新的版本中已经被新的API替代:Please use side output instead
~
参数
split参数: interface OutputSelector
select参数: 字符串
返回
split: SingleOutputStreamOperator -> SplitStream~slect: SplitStream -> DataStream
示例 : 如下 ↓
public class Flink03_Transform_select {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
// 把数据转换为javaBean类
SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 把一个流中的元素按照指定规则进行切分(其实就是给符合规则的元素贴上标签)
SplitStream<WaterSensor> splitDS = mapDS.split(new MyOutputSeletor());
// splitDS.select("normal").print("正常");
// splitDS.select("warning").print("报警");
// splitDS.select("alarm").print("警告");
splitDS.select("balance").print("正常");
splitDS.select("balance").print("报警");
env.execute();
}
private static class MyOutputSeletor implements OutputSelector<WaterSensor> {
/*
定义规则:
正常: 水位小于95
报警: 水位大于95,小于98
警告: 水位大于98
*/
@Override
public Iterable<String> select(WaterSensor value) {
// 返回值类型是String类型的迭代器,说明,每一个元素都可以贴上多个标签进行说明
if(value.getVc() <= 95){
return Arrays.asList("normal","balance");
}else if(value.getVc() < 98){
return Arrays.asList("warning","balance");
}else if(value.getVc() >= 98){
return Arrays.asList("alarm");
}else {
return Arrays.asList("。。。。");
}
}
}
}
// 奇数一个流, 偶数一个流
SplitStream<Integer> splitStream = env
.fromElements(10, 3, 5, 9, 20, 8)
.split(value -> value % 2 == 0
? Collections.singletonList("偶数")
: Collections.singletonList("奇数"));
splitStream
.select("偶数")
.print("偶数");
splitStream
.select("奇数")
.print("奇数");
env.execute();
作用
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。扩展:
- 以FIFO队列的方式进入到合流中。
- connect算子只能合并两条数据流,但是这两条数据流的数据类型可以不一致。
参数
另外一个流
返回
DataStream[A], DataStream[B] -> ConnectedStreams[A,B]
示例 : 如下 ↓
DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");
// 把两个流连接在一起: 貌合神离
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs.getFirstInput().print("first");
cs.getSecondInput().print("second");
env.execute();
注意:
- 两个流中存储的数据类型可以不同
- 只是机械的合并在一起, 内部仍然是分离的2个流
- 只能2个流进行connect, 不能有第3个参与
作用
对两个或者两个以上的DataStream流进行**union(合并)**操作,产生的结果是一个包含所有DataStream流的元素的新DataStream流。扩展:
- 以FIFO队列的方式进入到合流中
- 不去重
- union算子可以合并多条流 ,但是保证每一条流的元素的类型必须一致。
示例 : 如下 ↓
DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<Integer> stream2 = env.fromElements(10, 20, 30, 40, 50);
DataStreamSource<Integer> stream3 = env.fromElements(100, 200, 300, 400, 500);
// 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融
stream1
.union(stream2)
.union(stream3)
.print();
- union之前两个流的类型必须是一样,connect可以不一样。
- connect只能操作两个流,union可以操作多个。
- connect连接后的两个流其实也是两条流,并没有真正的合并为一条流(因为流的类型不一致)具体查看源码,而union是真的把多条流合并在了一起,组成了一条流。
1、简言之:数据来一条,聚合一条,输出一次!!!
2、在适用Operarot算子之前,必须先适用keyBy算子对数据进行分组
常见的滚动聚合算子
- sum
- min
- ma
- minBy
- maxBy
作用
KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
参数
如果流中存储的是POJO或者scala的样例类, 参数使用字段名
如果流中存储的是元组, 参数就是位置(基于0…)
返回
KeyedStream -> SingleOutputStreamOperator
示例 : 如下 ↓
DataStreamSource<Integer> stream = env.fromElements(1, 2, 3, 4, 5);
KeyedStream<Integer, String> kbStream = stream.keyBy(ele -> ele % 2 == 0 ? "奇数" : "偶数");
kbStream.sum(0).print("sum");
kbStream.max(0).print("max");
kbStream.min(0).print("min");
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 30));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
KeyedStream<WaterSensor, String> kbStream = env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId);
kbStream
.sum("vc")
.print("maxBy...");
注意: **
分组聚合后, 理论上只能取分组字段和聚合结果**, 但是Flink允许其他的字段也可以取出来, 其他字段默认情况是取的是这个组内第一个元素的字段值
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
KeyedStream<WaterSensor, String> kbStream = env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId);
kbStream
.maxBy("vc", false)
.print("max...");
env.execute();
注意: **
maxBy和minBy可以指定当出现相同值的时候,其他字段是否取第一个**. true表示取第一个, false表示取最后一个.
作用
process算子在Flink算是一个比较底层的算子,很多类型的流上都可以调用,可以从流中获取更多的信息(不仅仅数据本身)
env
.fromCollection(waterSensors)
.process(new ProcessFunction<WaterSensor, Tuple2<String, Integer>>() {
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.getId(), value.getVc()));
}
})
.print();
env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, Tuple2<String, Integer>>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>("key是:" + ctx.getCurrentKey(), value.getVc()));
}
})
.print();
public class Flink06_Transform_process {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataStreamSource<String> fileDS = env.socketTextStream("localhost", 9999);
DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
// 转换为对象
SingleOutputStreamOperator<WaterSensor> beanDS = fileDS.map((MapFunction<String, WaterSensor>) r -> {
String[] datas = r.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 获取对象中的各个属性,针对于属性进行分组
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> mapDS = beanDS.map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
}
});
// 分组
// 适用lambda方式
KeyedStream<Tuple3<String, Long, Integer>, String> keyByDS = mapDS.keyBy(r -> r.f0);
keyByDS.process(new KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>() {
/**
* processElement 处理数据的方法:来一条,处理一条
* @param value 一条数据
* @param ctx 上下文对象
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(Tuple3<String, Long, Integer> value, Context ctx, Collector<String> out) throws Exception {
out.collect("当前key : " + ctx.getCurrentKey() + ",当前时间 :" + ctx.timestamp() + ",数据 :" + value);
}
}).print("process方法——>");
env.execute();
}
}
KeyedProcessFunction类中的Context上下文对象的来源(源码追踪):
- KeyedProcessFunction继承自AbstractRichFunction类
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
- AbstractRichFunction类中定义了上下文对象
public abstract class AbstractRichFunction implements RichFunction, Serializable { private transient RuntimeContext runtimeContext;
所以KeyedProcessFunction类中的Context上下文对象来源于AbstractRichFunction类
作用
(简化规约 — 聚合)一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值!(也就是flink中的状态)
参数
interface ReduceFunction
返回
KeyedStream -> SingleOutputStreamOperator
示例 : 如下 ↓
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
KeyedStream<WaterSensor, String> kbStream = env
.fromCollection(waterSensors)
.keyBy(WaterSensor::getId);
kbStream
.reduce(new ReduceFunction<WaterSensor>() {
/*
参数说明:
- 参数1 - value1:累加器,原始数据
- 参数2 - value2:新添加的数据
*/
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("reducer function ...");
return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
}
})
.print("reduce...");
env.execute();
kbStream
.reduce((value1, value2) -> {
System.out.println("reducer function ...");
return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
})
.print("reduce...");
示例3:
public class Flink05_Transform_reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> fileDS = env.socketTextStream("localhost", 9999);
// 转换为对象
SingleOutputStreamOperator<WaterSensor> beanDS = fileDS.map((MapFunction<String, WaterSensor>) r -> {
String[] datas = r.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 获取对象中的各个属性,针对于属性进行分组
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> mapDS = beanDS.map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
}
});
// 分组
// 适用lambda方式
KeyedStream<Tuple3<String, Long, Integer>, String> keyByDS = mapDS.keyBy(r -> r.f0);
// reduce聚合方法
/*
1. 输入的类型要一致,输出的类型也一致(构建的新值)。
2. 首次进入的数据,不会进入reduce(因为reduce必须满足两条数据)
3. 保存了中间的状态(值)
* */
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> result = keyByDS.reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> value1, Tuple3<String, Long, Integer> value2) throws Exception {
// 求水位的聚合值
//return value1.f2 + value2.f2;
return Tuple3.of("result", 123456789L, value1.f2 + value2.f2);
}
});
result.print("result");
env.execute();
}
}
注意:
- 聚合后结果的类型, 必须和原来流中元素的类型保持一致!
先按照key分组, 按照key的双重hash来选择后面的分区
对流中的元素随机分区
对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化
同 rebalance一样, 也是平均循环的分布数据。但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"。
Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作。
类似于Spark中的行动算子。
public DataStreamSink<T> print(String sinkIdentifier) {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
return addSink(printFunction).name("Print to Std. Out");
}
Flink内置了一些Sink, 除此之外的Sink需要用户自定义!
kafka常用目录技巧:
1. topic --zookeeper
2. producer --broker-list
3. consumer --bootstrap-server
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
启动Kafka集群
[heather@hadoop102 bin]$ kafka.sh start
Sink到Kafka的示例代码
public class Flink07_Sink_kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.readTextFile("input/sensor_data.log");
inputDS.addSink(new FlinkKafkaProducer011<String>(
"hadoop102:9092",
"sensor110", new SimpleStringSchema())
);
// 启动执行程序
env.execute();
}
}
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic sensor110
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
启动Redis服务器
Sink到Redis的示例代码
package com.heather.flink.java.chapter_5.sink;
import com.alibaba.fastjson.JSON;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.util.ArrayList;
public class Flink02_Sink_Redis {
public static void main(String[] args) throws Exception {
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
// 连接到Redis的配置
FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop102")
.setPort(6379)
.setMaxTotal(100)
.setTimeout(1000 * 10)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env
.fromCollection(waterSensors)
.addSink(new RedisSink<>(redisConfig, new RedisMapper<WaterSensor>() {
/*
key value(hash)
"sensor" field value
sensor_1 {"id":"sensor_1","ts":1607527992000,"vc":20}
... ...
*/
@Override
public RedisCommandDescription getCommandDescription() {
// 返回存在Redis中的数据类型 存储的是Hash, 第二个参数是外面的key
return new RedisCommandDescription(RedisCommand.HSET, "sensor");
}
@Override
public String getKeyFromData(WaterSensor data) {
// 从数据中获取Key: Hash的Key
return data.getId();
}
@Override
public String getValueFromData(WaterSensor data) {
// 从数据中获取Value: Hash的value
return JSON.toJSONString(data);
}
}));
env.execute();
}
}
Redis查看是否收到数据
redis-cli --raw
![](https://img-blog.csdnimg.cn/img_convert/b9db9d87df1dd9d8136b02e5b76b2b1e.png#align=left&display=inline&height=352&margin=[object Object]&originHeight=352&originWidth=1184&status=done&style=none&width=1184)
**注意: **
发送了5条数据, redis中只有2条数据. 原因是hash的field的重复了, 后面的会把前面的覆盖掉·
<!-- [https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.12.0</version>
</dependency>
package com.heather.flink.java.chapter_5.sink;
import com.alibaba.fastjson.JSON;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class Flink03_Sink_ES {
public static void main(String[] args) throws Exception {
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
List<HttpHost> esHosts = Arrays.asList(
new HttpHost("hadoop102", 9200),
new HttpHost("hadoop103", 9200),
new HttpHost("hadoop104", 9200));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env
.fromCollection(waterSensors)
.addSink(new ElasticsearchSink.Builder<WaterSensor>(esHosts, new ElasticsearchSinkFunction<WaterSensor>() {
@Override
public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
// 1. 创建es写入请求
IndexRequest request = Requests
.indexRequest("sensor")
.type("_doc")
.id(element.getId())
.source(JSON.toJSONString(element), XContentType.JSON);
// 2. 写入到es
indexer.add(request);
}
}).build());
env.execute();
}
}
Elasticsearch查看是否收到数据
注意:
如果出现如下错误:
添加log4j2的依赖:
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
如果是无界流, 需要配置bulk的缓存
esSinkBuilder.setBulkFlushMaxActions(1);
如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,怎么办?
我们自定义一个到Mysql的Sink
create database test;
use test;
CREATE TABLE `sensor` (
`id` varchar(20) NOT NULL,
`ts` bigint(20) NOT NULL,
`vc` int(11) NOT NULL,
PRIMARY KEY (`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
package com.heather.flink.java.chapter_5.sink;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
public class Flink04_Sink_Custom {
public static void main(String[] args) throws Exception {
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.fromCollection(waterSensors)
.addSink(new RichSinkFunction<WaterSensor>() {
private PreparedStatement ps;
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false", "root", "000000");
ps = conn.prepareStatement("insert into sensor values(?, ?, ?)");
}
@Override
public void close() throws Exception {
ps.close();
conn.close();
}
@Override
public void invoke(WaterSensor value, Context context) throws Exception {
ps.setString(1, value.getId());
ps.setLong(2, value.getTs());
ps.setInt(3, value.getVc());
ps.execute();
}
});
env.execute();
}
}
? Flink在1.12.0上对流式API新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes)。
? 流式API的传统执行模式我们称之为STREAMING执行模式, 这种模式一般用于无界流, 需要持续的在线处理。
? 1.12.0新增了一个BATCH执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据。
? 默认是使用的STREAMING执行模式。
BATCH执行模式仅仅用于有界数据, 而STREAMING 执行模式可以用在有界数据和无界数据.
一个公用的规则就是: 当你处理的数据是有界的就应该使用BATCH执行模式, 因为它更加高效. 当你的数据是无界的, 则必须使用STREAMING 执行模式, 因为只有这种模式才能处理持续的数据流.
执行模式有3个选择可配:
- STREAMING(默认)
- BATCH
- AUTOMATIC
bin/flink run -Dexecution.runtime-mode=BATCH ...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议:
不要在运行时配置(代码中配置), 而是使用命令行配置, 引用这样会灵活: 同一个应用即可以用于无界数据也可以用于有界数据
STREAMING模式下, 数据是来一条输出一次结果。
BATCH模式下, 数据处理完之后, 一次性输出结果。
下面展示WordCount的程序读取文件内容在不同执行模式下的执行结果对比 :
// 默认流式模式, 可以不用配置
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
网站总浏览量(PV)的统计
衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。接下来我们就用咱们之前学习的Flink算子来实现在PV的统计
在咱们当前的案例中,给大家准备了某电商网站的用户行为日志数据UserBehavior.csv,本日志数据文件中包含了某电商网站一天近五十万随机用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。将数据文件放置在指定目录中,便于读取到Flink中使用
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {
// 用户ID
private Long userId;
// 商品ID
private Long itemId;
//品类ID
private Integer categoryId;
//用户的行为类型:pv(浏览) 、 buy(购买) 、 fav(收藏) 、cart(喜欢)
private String behavior;
//时间戳
private Long timestamp;
}
核心代码 - WordCount思想
public class Flink10_Case_PV {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
DataStreamSource<String> inputDS = env.readTextFile("input/UserBehavior.csv");
// 2. 数据处理
// 2.1 把数据转换为javaBean对象
SingleOutputStreamOperator<UserBehavior> userBehaviorDS = inputDS.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String value) throws Exception {
String[] datas = value.split(",");
// 通过数组中的各个元素构建对象
return new UserBehavior(
Long.valueOf(datas[0]),
Long.valueOf(datas[1]),
Integer.valueOf(datas[2]),
datas[3],
Long.valueOf(datas[4])
);
}
});
// 2.2 数据清洗阶段(过滤出所有数据中用户行为是浏览的【pv】)
SingleOutputStreamOperator<UserBehavior> pvDS = userBehaviorDS.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior value) throws Exception {
return "pv".equals(value.getBehavior());
}
});
// 2.3 TODO WordCount求和操作即可
// 把过滤后的数据转换为二元组(pv,1)
SingleOutputStreamOperator<Tuple2<String, Integer>> pvTuple2DS = pvDS.map(new MapFunction<UserBehavior, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
return new Tuple2<>("pv", 1);
}
});
// 分组
KeyedStream<Tuple2<String, Integer>, Tuple> kedByDS = pvTuple2DS.keyBy(0);
// 聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> pvSum = kedByDS.sum(1);
// 3. 输出、保存...
pvSum.print("PV总量");
// 启动执行
env.execute();
}
}
核心代码 - 统计的维度 思想 进行分组【重点 ! ! !】
public class Flink10_Case_PV_Process_DIM {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
DataStreamSource<String> inputDS = env.readTextFile("input/UserBehavior.csv");
// 2. 数据处理
// 2.1 把数据转换为javaBean对象
SingleOutputStreamOperator<UserBehavior> userBehaviorDS = inputDS.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String value) throws Exception {
String[] datas = value.split(",");
// 通过数组中的各个元素构建对象
return new UserBehavior(
Long.valueOf(datas[0]),
Long.valueOf(datas[1]),
Integer.valueOf(datas[2]),
datas[3],
Long.valueOf(datas[4])
);
}
});
// 2.2 数据清洗阶段(过滤出所有数据中用户行为是浏览的【pv】)
SingleOutputStreamOperator<UserBehavior> pvDS = userBehaviorDS.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior value) throws Exception {
return "pv".equals(value.getBehavior());
}
});
// 2.3 TODO 按照 统计的维度 分组 :pv行为
// Lambda表达式
//KeyedStream<UserBehavior, String> userBehaviorPvDS = pvDS.keyBy(data -> data.getBehavior());
// 匿名内部对象方式
KeyedStream<UserBehavior, String> userBehaviorPvDS = pvDS.keyBy(new KeySelector<UserBehavior, String>() {
@Override
public String getKey(UserBehavior value) throws Exception {
return value.getBehavior();
}
});
// 求和 => 实现 计数 的功能,没有count这种聚合算子
// 般找不到现成的算子,那就调用底层的 process
SingleOutputStreamOperator<Long> pvSum = userBehaviorPvDS.process(new KeyedProcessFunction<String, UserBehavior, Long>() {
// 定义变量,用来统计数据
private Long pvSum = 0L;
/**
* process 算子的特点:来一条数据处理一条数据,就执行一次算子。【也就是循环】
* @param value 数据
* @param ctx 上下文对象
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(UserBehavior value, Context ctx, Collector<Long> out) throws Exception {
pvSum++;
out.collect(pvSum);
}
});
// 3. 输出、保存...
pvSum.print("PV总量");
// 启动执行
env.execute();
}
}
核心代码 - flatMap算子(一进多出或0出)【重点 ! ! !】
public class Flink10_Case_PV_FlatMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
env.readTextFile("input/UserBehavior.csv")
.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] datas = value.split(",");
if("pv".equals(datas[3])){
out.collect(Tuple2.of("pv",1L));
}
}
})
.keyBy(0)
.sum(1)
.print("PV总量");
// 启动执行
env.execute();
}
}
网站独立访客数(UV)的统计
以上案例中,我们统计的是所有用户对页面的所有浏览行为,也就是同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会关注,到底有多少不同的用户访问了网站,所以另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)
数据准备
对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户
核心代码
public class Flink10_Case_UV {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
DataStreamSource<String> inputDS = env.readTextFile("input/UserBehavior.csv");
// 2. 数据处理
// 2.1 把数据转换为javaBean对象
SingleOutputStreamOperator<UserBehavior> userBehaviorDS = inputDS.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String value) throws Exception {
String[] datas = value.split(",");
// 通过数组中的各个元素构建对象
return new UserBehavior(
Long.valueOf(datas[0]),
Long.valueOf(datas[1]),
Integer.valueOf(datas[2]),
datas[3],
Long.valueOf(datas[4])
);
}
});
// 2.2 TODO UV就是PV去重后的结果
// 实现:对userId进行去重操作。(把userId存储到set集合中,最后获取set集合的长度即可)
// 1. 数据清洗阶段(过滤出所有数据中用户行为是浏览的【pv】)
SingleOutputStreamOperator<UserBehavior> pvDS = userBehaviorDS.filter((FilterFunction<UserBehavior>) value -> "pv".equals(value.getBehavior()));
// 2. 把数据转换为二元组(uv,userId)
// 二元组的组成元素:
// 第一个位置:uv:是为了分组,方便后续的sum求和和process操作
// 第二个位置:userId,是为了把获取到的数据存储到set集合中进行去重
SingleOutputStreamOperator<Tuple2<String, Long>> uvTuple2 = pvDS.map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(UserBehavior value) throws Exception {
return Tuple2.of("uv", value.getUserId());
}
});
// 3. 分组
KeyedStream<Tuple2<String, Long>, String> keyedUvDS = uvTuple2.keyBy(datas -> datas.f0);
// 4. 此时到了需要求和的步骤了,但是flink明显没有提供求和操作,所以可以使用process操作
SingleOutputStreamOperator<Integer> uvSum = keyedUvDS.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Integer>() {
Set<Long> userIdSet = new HashSet<>();
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Integer> out) throws Exception {
userIdSet.add(value.f1);
out.collect(userIdSet.size());
}
});
// 3. 输出、保存...
uvSum.print("UV总量");
// 启动执行
env.execute();
}
}
Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。
窗口 :
window
时间语义 & 水位线 :
time & watermark
状态管理 :
state
容错机制 :
checkPoint & savePoint
在Flink的流处理应用中,数据是源源不断的,但是有时候我们需要对数据最一些聚合类的操作,例如:①统计在过去的1分钟内有多少用户点击了网页?②在过去的5分钟内有多少用户进行了下单等等。。
此时,我们就需要定义一个窗口(window)用来收集最近1分钟、5分钟的数据,并且对此窗口内的所有数据进行计算。
time-window
: 根据时间划分窗口,如:每隔 1分钟统计最近1分钟的数据。count-window : 根据数量划分窗口,如:每隔100个数据统计最近100个数据。
tumbling-window : 滚动窗口,size=slide,如:每隔5s统计最近5s的数据
sliding-window : 滑动窗口 : size>slide,如:每隔5s统计最近10s的数据
注意:
- 当size<slide时,如:每隔15s统计最近10s的数据,这种需求会丢失5s的数据,所以,开发中不用。
- 滚动窗口其实就是一种特殊的滑动窗口。
滑动窗口细节:
如果窗口长度为10s,而滑动步长为2s,那么一个数据会计算(10 / 2)5次。
如果 窗口长度为10s,而滑动步长为3s,那么一个数据会计算(10 / 3) 3次或者4次。
滑动窗口的计算是由后向前进行计算,具体详情请查看图。
滑动窗口在计算时,先计算时间戳较大的,也就是window4 -> window3 -> window2 -> window1的顺序
。
Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算
窗口分类的总结:
基于时间的滚动窗口tumbling-time-window–用的较多
基于时间的滑动窗口sliding-time-window–用的较多
基于数量的滚动窗口tumbling-count-window–用的较少
基于数量的滑动窗口sliding-count-window–用的较少
window 和 windowAll
使用keyby的流,应该使用window方法
未使用keyby的流,应该调用windowAll方法
keyed windows(按照key分组后对窗口内的数据进行聚合)
stream
.keyBy(...)
.window(...)
.reduce/aggregate/fold/apply()
no keyed window(未分组)
stream
.windowAll(...)
.reduce/aggregate/fold/apply()
window 和 windowAll 这两个方法都需要一个windowAssigner作为参数进行使用。
windowAssigner(窗口分配器) : 负责将每条输入的数据分发到正确的window窗口中进行计算。
**哈哈 :**Flink为我们提供了很多的现成的windowAssigner 。
如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。
需求:
nc -lk 9999
有如下数据表示: 信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口
各个路口:分组
public class WindowDemo01_TimeWindow {
public static void main(String[] args) throws Exception {
// 基本环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取socket数据源
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
// 数据的格式转换
SingleOutputStreamOperator<CartInfo> mapDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] datas = value.split(",");
return new CartInfo(datas[0], Integer.parseInt(datas[1]));
}
});
// 把数据进行分组
KeyedStream<CartInfo, String> keyByDS = mapDS.keyBy(CartInfo::getSensorId);
// 把数据分发到不同的window窗口中,使用windwoAssigner,此处因为已经分组,所以使用window()方法
// WindowedStream<CartInfo, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
// 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
WindowedStream<CartInfo, String, TimeWindow> tumbliWindowDS = keyByDS
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// 聚合结果
SingleOutputStreamOperator<CartInfo> tumblCount = tumbliWindowDS.sum("count");
// 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
WindowedStream<CartInfo, String, TimeWindow> slideWindowDS = keyByDS
.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10)));
// 聚合结果
SingleOutputStreamOperator<CartInfo> slidCount = slideWindowDS.sum("count");
// 输出结果
tumblCount.print("滚动");
slidCount.print("滑动");
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
public class WindowDemo02_CountWindow {
public static void main(String[] args) throws Exception {
// 基本环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取socket数据源
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
// 数据的格式转换
SingleOutputStreamOperator<CartInfo> mapDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] datas = value.split(",");
return new CartInfo(datas[0], Integer.parseInt(datas[1]));
}
});
// 把数据进行分组
KeyedStream<CartInfo, String> keyByDS = mapDS.keyBy(CartInfo::getSensorId);
// 按照数据数量滚动窗口计算
SingleOutputStreamOperator<CartInfo> tumdDS = keyByDS.countWindow(5L).sum("count");
// 按照数据数量滑动窗口计算
SingleOutputStreamOperator<CartInfo> slidDS = keyByDS.countWindow(6L, 2L).sum("count");
// 输出结果
// tumdDS.print("滚动");
slidDS.print("滑动");
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
和web领域的session会话一个意思,收集会话中的数据并进行分析。
API : 设置会话的时间间隔:
ProcessingTimeSessionWindows.withGap
需求:
设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
public class WindowDemo03_SessionWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<CartInfo> mapDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] datas = value.split(",");
return new CartInfo(datas[0], Integer.valueOf(datas[1]));
}
});
// 分组
KeyedStream<CartInfo, String> keyedDS = mapDS.keyBy(CartInfo::getSensorId);
// 会话窗口
WindowedStream<CartInfo, String, TimeWindow> sessionDS = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
SingleOutputStreamOperator<CartInfo> result = sessionDS.sum("count");
result.print("会话窗口");
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
在Flink的流式处理中,会涉及到时间的不同概念,如下所示:
- EventTime : 事件时间
- IngestTime : 摄入时间(注入Flink时间)
- ProcessingTime :处理时间(机器时间)
使用时必须指定watermark的生成方式
。使用时不需要指定watermark的生成方式(自动生成)
。总结:
在实际开发中,我们希望基于事件时间来处理数据,但是因为网络延迟等原因,出现了乱序或者延迟到达等问题,此种情况下处理的结果就不是我们想要的,甚至于出现数据丢失的情况,所以就需要一种机制来解决一定问题上的数据丢失或延迟到达的问题!因此,watermark
应运而生(水印机制 / 水位线)!!!
Watermaker就是给数据再额外的加的一个时间列。
也就是Watermaker是个时间戳 !
在上一个章节中,我们已经学习了Flink的基础编程API的使用,接下来,我们来学习Flink编程的高阶部分。所谓的高阶部分内容,其实就是Flink与其他计算框架不相同且占优势的地方,比如Window和Exactly-Once,接下来我们就对这些内容进行详细的学习。
? 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
? 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
? 在Flink中, 窗口(window)是处理无界流的核心。 窗口把流切割成有限大小的多个"存储桶"(***bucket)***, 我们在这些桶上进行计算。
窗口分为2类:
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口, 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间差的方法(maxTimestamp())。
时间窗口又分4种 :
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙,比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
API说明:
DataStream可以直接调用开窗的方法,但是都带“all”,这种情况下所有的数据不分组,都在窗口中。
所以在使用API时,应该尽可能的先对数据进行分组,分组后再调用开窗的API 。
**示例代码 一: **
public class Flink01_Window_TimeWindow_roll {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 获取数据源
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
KeyedStream<Tuple2<String, Integer>, String> keyByDS = socketDS
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy(r -> r.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS
//滚动窗口 =》 一个参数 :窗口长度
.timeWindow(Time.seconds(5));
// 下面的方法是底层Process的API的实现
//.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
dataDS.sum(1).print("滚动窗口");
env.execute();
}
}
代码示例 二:
public class Flink01_Window_TimeTumbling {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = socketDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] datas = value.split(" ");
return Tuple2.of(datas[0], 1);
}
});
// 分组
KeyedStream<Tuple2<String, Integer>, String> keyByDS = mapDS.keyBy(data -> data.f0);
// 开窗
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// 聚合计算 sum算子
SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowDS.sum(1);
result.print();
// 启动执行
env.execute();
}
}
说明:
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。特点:
一个数据最多可以属于多少个窗口? ==>
(窗口长度 / 滑动步长)
**示例代码: **
用法和滚动窗口类似,只不过是滚动窗口需要一个参数,而滑动窗口需要两个参数
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS
//滑动窗口 => 两个参数 :第一个是 窗口长度 ; 第二个是 滑动步长
.timeWindow(Time.seconds(5), Time.seconds(2));
// 下面的方法是底层Process的API的实现
//.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)));
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。
**示例代码: **
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)));
.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
@Override
public long extract(Tuple2<String, Long> element) {// 返回 gap值, 单位毫秒
return element.f0.length() 1000;
}
}))
创建原理:
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction
全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任务计算, 因为这种窗口没有能够处理聚集在一起元素的结束点。
**示例代码: **
.window(GlobalWindows.create());
按照指定的数据条数生成一个Window,与时间无关。
计算窗口分2类 :
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:那个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
分为两类:
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素。
window function 可以是ReduceFunction,AggregateFunction或者 ProcessWindowFunction中的任意一种。
- ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合
- ProcessWindowFunction可以得到一个包含这个窗口中所有元素的迭代器及这些元素所属窗口的一些元数据信息。
- ProcessWindowFunction不能被高效执行是因为Flink在执行这个函数之前, 需在内部缓存这个窗口上所有的元素。
ReduceFunction和AggregateFunction区别:
- ReduceFunction不能够改变数据的类型。
- AggregateFunction可以改变数据的类型(更灵活)。
public class Flink01_Window_ReduceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
// 分组操作
KeyedStream<Tuple2<String, Integer>, String> keyByDS = socketDS
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy(data -> data.f0);
// 开窗
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.timeWindow(Time.seconds(5));
// 聚合操作 reduce
dataDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
System.out.println(value1 + "<---->" + value2);
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
env.execute();
}
}
public class Flink01_Window_AggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
// 分组操作
KeyedStream<Tuple2<String, Integer>, String> keyByDS = socketDS
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy(data -> data.f0);
// 开窗
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.timeWindow(Time.seconds(5));
// 聚合操作 aggregate
SingleOutputStreamOperator<Integer> agregateResult = dataDS.aggregate(new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() {
// 创建累加器 —> 初始化 —>初始值为0
@Override
public Integer createAccumulator() {
return 0;
}
// 累加操作 —> 当进来数据后,如何进行累加,定义累加规则
@Override
public Integer add(Tuple2<String, Integer> value, Integer acc) {
System.out.println("add ... ");
return acc + 1;
}
// 获取最终结果
@Override
public Integer getResult(Integer acc) {
System.out.println("getResult ... ");
return acc;
}
// 会话窗口 才会调用:合并累加器的结果
@Override
public Integer merge(Integer acc1, Integer acc2) {
return acc1 + acc2;
}
});
agregateResult.print();
env.execute();
}
}
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。
ProcessWindowFunction就是一个对整个窗口中数据处理的函数。
容易引起
OOM异常
。
/**
* 全窗口函数:整个窗口的本组数据,存起来,关窗的时候一次性一起计算
*/
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow> () {
// 参数1: key
// 参数2: 上下文对象
// 参数3: 这个窗口内所有的元素
// 参数4: 收集器, 用于向下游传递数据
@Override
public void process(
String key,
Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Long>> out) throws Exception{
System.out.println(context.window().getStart()) ;
long sum = 0L;
for(Tuple2<String, Long> t : elements) {
sum += t. f1 ;
}
out.collect(Tuple2.of(key, sum));
}
})
其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用.
在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个task都有自己单独窗口. 正如前面的代码所示。
在非non-keyed stream上使用窗口, 流的并行度只能是1, 所有的窗口逻辑只能在一个单独的task上执行。
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
需要注意的是: 非key分区的流, 即使把并行度设置为大于1 的数, 窗口也只能在某个分区上使用
时间语义
与WaterMark
在Flink的流式操作中, 会涉及不同的时间概念,在Flink中使用
时间语义
用来处理乱序数据和迟到数据。
需要结合watermark水位线一起使用才有价值。
事件时间是指的这个事件发生的时间。例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
1.12 版本后默认的事件版本就是
事件时间
。
在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在.
在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关. 事件时间程序必须制定如何产生Event Time Watermarks(水印) 。在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟)。
在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果。事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器
注意:
在1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间
处理时间是指的执行算子操作的各个设备的本地时间,与机器相关,1.12 前的版本默认的时间属性就是process time。
对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟.比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据. 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据, 然后下个窗口是10:00am-11:00am, 等等
处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序
在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器
是数据进入Flink的时间。
![](https://img-blog.csdnimg.cn/img_convert/67dab82ff3b33858ef8974509731197f.png#align=left&display=inline&height=563&margin=[object Object]&originHeight=563&originWidth=1269&status=done&style=none&width=1269)
![](https://img-blog.csdnimg.cn/img_convert/f15a2992872f7a44fd30502669194a1e.png#align=left&display=inline&height=630&margin=[object Object]&originHeight=630&originWidth=1005&status=done&style=none&width=1005)
处理迟到、乱序数据的方法:
- 设置水位线
- 设置窗口允许迟到数据
- 侧输出流
概念:
WaterMark 就是给数据额外的增加的一个时间列,其实就是一个时间戳,一个特殊的时间戳。
获取:
WaterMark = 数据的事件时间 - 最大的延迟时间 或 乱序时间
注意:
通过源码可知,WaterMark = 当前窗口的最大事件时间 - 最大的延迟时间 或 乱序时间这样可以保证WaterMark水位线会一直上升(变大),不会下降。
作用:
之前的窗口都是按照系统时间来触发计算的,如:[10:00:00 ~ 10:00:10)的窗口,一旦系统时间到了10:00:10就会触发计算,那么就有可能会导致延迟到达的数据丢失。 现在可以借助于waterMark水位线进行解决,窗口就可以按照waterMark来触发计算,也就是说waterMark就是用来触发计算的。
触发条件:
- 窗口中有数据
- waterMark >= 窗口的结束时间
因为前面说到
? Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间 或 乱序时间
? 也就是说只要不断有数据来,就可以保证Watermaker水位线是会一直上升/变大的,不会下降/减小的,所以最终一定是会触发窗口计算的。
上面的触发公式进行如下变形:
Watermaker >= 窗口的结束时间
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间
当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间
WaterMark工作图解:
怎么知道是乱序?怎么知道是迟到的数据?
基于*事件时间*,可以得知数据是否是乱序以及数据是否是迟到时间。
已经知道了数据有乱序,做一个窗口的操作,用EventTime来触发窗口的关闭和计算,合不合适?
不合适,因为数据有乱序,不使用EventTime来触发窗口的关闭和计算 ——> 所以需要另外的技术来衡量*时间的进展*。
WaterMark
=> 表示 时间的进度
=> 用来触发窗口的 关闭、计算
=> 用来解决 数据乱序 的问题(等一会)
=> 特点:单调递增 的。
=> 源码:
/* Watermark tells operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator*/
public final class Watermark extends StreamElement {
private final long timestamp;
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
通过源码发现:watermark其实就是指的一个特殊的时间戳,通过注释知道,在这个时间戳之前的数据都已经到齐了。
怎么知道当前来的数据,属于哪个窗口?(也就是窗口是如何换分的)
窗口划分 :TumblingEventTimeWindows 类的 assignWindows()方法
=> 窗口开始时间:timestamp - (timespamp + windowSize) % windowSize
=> 窗口结束时间:new TimeWindow(start,start + size) —> start + size
=> 窗口左闭右开:属于窗口额最大时间戳为:maxTimestamp = end -1
窗口触发条件:window.maxTimestamp() <= ctx.getCurrentWatermark()
=> 由watermark触发窗口的计算,当 watermark >= 窗口数据的最大时间
乱序的场景下,怎么进行处理?
计算由 watermark 触发。
既然是乱序,就等待乱序后的数据都来后再进行计算。
watermark设定了等待的时间,如果超过了等待的时间,还有数据没到齐,怎么处理?[7.4 窗口允许迟到的数据](# 7.4 窗口允许迟到的数据)
窗口设置 —> 运行迟到 => allowedLateness
如果窗口设置了延迟时间,但是到了真正关窗的时间,后面还要属于这个窗口的数据来,怎么处理?
放到 侧输出流 中存起来。
在多并行度的时候,怎么确定watermark的取值?
以最小的为准,参考木桶原理。
AscendingTimestampExtractor
在下面的这个图中, 事件是有序的(按照他们自己的时间戳来看), watermark是流中一个简单的周期性的标记。
![](https://img-blog.csdnimg.cn/img_convert/a4705b06d0102ac7fb4f3a097f688fbb.png#align=left&display=inline&height=375&margin=[object Object]&originHeight=375&originWidth=1269&status=done&style=none&width=1269)
升序的简单入门案例:
public class Flink02_EventTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 1. 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<WaterSensor> dataDS = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
// TODO 2. 设置数据中的哪个字段(数据)可以当做事件时间【指定时间戳提取器】
// AscendingTimestampExtractor : 数据的 事件时间 是 升序的。(升序的时间戳提取器)
/*
升序的场景下,watermark就是当前的时间戳 - 1
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
* */
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<WaterSensor>() {
@Override
public long extractAscendingTimestamp(WaterSensor element) {
return element.getTs() * 1000L;
}
});
// 分组、开窗、聚合
dataDS
.keyBy(data -> data.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 事件时间
//.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 系统时间
/*
泛型说明:
1 : 输入的数据的类型
2 : 输出的数据的类型
3 : 分组的key的类型
4 : 时间窗口对象
*/
.process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
/*
参数说明:
参数1 : 分组的key的类型
参数2 : 上下文对象
参数3 : 窗口内的所有元素
参数4 : 数据收集器 》用于向下游发送数据
*/
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
out.collect(elements.spliterator().estimateSize());
}
}).print();
env.execute();
}
}
BoundedOutOfOrdernessTimestampExtractor
在下图中, 按照他们时间戳来看, 这些事件是乱序的, 则watermark对于这些乱序的流来说至关重要.
通常情况下, 水印是一种标记, 是流中的一个点, 所有在这个时间戳(水印中的时间戳)前的数据应该已经全部到达. 一旦水印到达了算子, 则这个算子会提高他内部的时钟的值为这个水印的值.
![](https://img-blog.csdnimg.cn/img_convert/5a09751dd764240933d862590166865f.png#align=left&display=inline&height=371&margin=[object Object]&originHeight=371&originWidth=1269&status=done&style=none&width=1269)
乱序的简单入门案例:
public class Flink03_OutofOrderness {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 1. 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<WaterSensor> dataDS = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
/*
官方提供了BoundedOutOfOrdernessTimestampExtractor抽象类用来 在乱序情况下提取 事件时间 和生成 watermark水位线
使用规则:
- 参数:最大乱序程度,是等待时间(5秒以内)
- 需要重写extractTimestamp()方法,如何从数据中抽取出事件时间
何为乱序:时间戳大的先到了Flink中。
假设数据是 1,2,3,4,5,6 秒生成的,开3s的滚动窗口 [0,3),[3,6),[6,9)
来的数据是 1,6,3,2,4,5 =》 最大乱序程度是 4s
等4s再关闭窗口 => [0,3),本应该再eventtime >= 3s时关窗,等待之后就是 7s后关窗 。
watermark表示时间进展、触发窗口的计算、关窗 —> 也就是watermark = 3s时,[0,3)关闭并计算
【watermark = EventTime = awaitTime = 7 - 4 = 3s。】
为了实现单调递增,上面公式的 EventTime ,应该是当前为止,最大的时间戳。
*/
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {
@Override
public long extractTimestamp(WaterSensor element) {
return element.getTs();
}
}
);
// 分组、开窗、聚合
dataDS
.keyBy(data -> data.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 事件时间
//.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 系统时间
/*
泛型说明:
1 : 输入的数据的类型
2 : 输出的数据的类型
3 : 分组的key的类型
4 : 时间窗口对象
*/
.process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
/*
参数说明:
参数1 : 分组的key的类型
参数2 : 上下文对象
参数3 : 窗口内的所有元素
参数4 : 数据收集器 》用于向下游发送数据
*/
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
out.collect(elements.spliterator().estimateSize());
}
}).print();
env.execute();
}
}
在 Flink 中, 水印(watermark水位线)由应用程序开发人员生成, 这通常需要对相应的领域有 一定的了解。完美的水印永远不会错:时间戳小于水印标记时间的事件不会再出现。在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水印。
启发式水印则相反,它只估计时间,因此有可能出错, 即迟到的事件 (其时间戳小于水印标记时间)晚于水印出现。针对启发式水印, Flink 提供了处理迟到元素的机制。
设定水印通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过 5 秒, 就可以将水印标记时间设为收到的最大时间戳减去 5 秒。 另 一种做法是,采用一个 Flink 作业监控事件流,学习事件的迟到规律,并以此构建水印生成模型。
Flink内置了两个WaterMark生成器:
Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为0)
WatermarkStrategy.forMonotonousTimestamps();
Fixed Amount of Lateness(允许固定时间的延迟)
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
package com.atheather.guigu.flink.java.chapter_7;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Flink10_Chapter07_OrderedWaterMark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 创建水印生产策略
WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // // 最大容忍的延迟时间
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
stream
.assignTimestampsAndWatermarks(wms) // 指定水印和时间戳
.keyBy(WaterSensor: :getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
String msg = "当前key: " + key
+ "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd()/1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据 ";
out.collect(msg);
}
})
.print();
env.execute();
}
}
有2种风格的WaterMark生产方式: periodic(周期性) and punctuated(间歇性).都需要继承接口: WatermarkGenerator
package com.heather.flink.java.chapter_7;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class Flink11_Chapter07_Period {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("localhost", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
});
// 创建水印生产策略
WatermarkStrategy<WaterSensor> myWms = new WatermarkStrategy<WaterSensor>() {
@Override
public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
System.out.println("createWatermarkGenerator ....");
return new MyPeriod(3);
}
}.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("recordTimestamp " + recordTimestamp);
return element.getTs() * 1000;
}
});
stream
.assignTimestampsAndWatermarks(myWms)
.keyBy(WaterSensor::getId)
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
String msg = "当前key: " + key
+ "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据 ";
out.collect(context.window().toString());
out.collect(msg);
}
})
.print();
env.execute();
}
public static class MyPeriod implements WatermarkGenerator<WaterSensor> {
private long maxTs = Long.MIN_VALUE;
// 允许的最大延迟时间 ms
private final long maxDelay;
public MyPeriod(long maxDelay) {
this.maxDelay = maxDelay * 1000;
this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
}
// 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
@Override
public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
System.out.println("onEvent..." + eventTimestamp);
//有了新的元素找到最大的时间戳
maxTs = Math.max(maxTs, eventTimestamp);
System.out.println(maxTs);
}
// 周期性的把WaterMark发射出去, 默认周期是200ms
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// System.out.println("onPeriodicEmit...");
// 周期性的发射水印: 相当于Flink把自己的时钟调慢了一个最大延迟
output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
}
}
}
public class Flink12_Chapter07_punctuated {
public static void main(String[] args) throws Exception {
// 省略....
public static class MyPunctuated implements WatermarkGenerator<WaterSensor> {
private long maxTs;
// 允许的最大延迟时间 ms
private final long maxDelay;
public MyPunctuated(long maxDelay) {
this.maxDelay = maxDelay * 1000;
this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
}
// 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
@Override
public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
System.out.println("onEvent..." + eventTimestamp);
//有了新的元素找到最大的时间戳
maxTs = Math.max(maxTs, eventTimestamp);
output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 不需要实现
}
}
}
总结: 多并行度的条件下, 向下游传递WaterMark的时候, 总是以最小的那个WaterMark为准! 木桶原理!
已经添加了wartemark之后, 仍有数据会迟到怎么办?Flink的窗口, 也允许迟到数据。
- 当waterMark >= 窗口的结束时间的时候,会正常的触发计算,但是不会关闭窗口。
- 当waterMark >= 窗口的结束时间 + 窗口的等待时间,会真正的关闭窗口。
- 当 窗口的结束时间 <= waterMark <= 窗口结束时间 + 窗口等待时间,每来一条迟到数据,就会计算一次。
注意:
允许迟到 只能运用在event time上
图解
public class Flink05_Watermark_AllowedLateness {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> waterSensorDS = socketDS
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
// 设置数据中的哪个字段(数据)可以当做事件时间【指定时间戳提取器】
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {
@Override
public long extractTimestamp(WaterSensor element) {
return element.getTs() * 1000L;
}
});
// 分组、开窗、聚合
waterSensorDS
.keyBy(data -> data.getId())
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(2))
// 全窗口函数:整个窗口的本组数据,存起来,关窗的时候一次性一起计算
.process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
out.collect(elements.spliterator().estimateSize());
}
})
.print();
env.execute();
}
}
允许迟到数据,,窗口也会真正的关闭, 如果还有迟到的数据怎么办? Flink提供了一种叫做侧输出流的来处理关窗之后到达的数据。
作用:
- 处理乱序数据、迟到数据
- 对不同的数据进行
分流
。
public class Flink07_ProcessFunction_SideOutput {
public static void main(String[] args) throws Exception {
// 0 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 1.env指定时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1.
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<WaterSensor>() {
private Long maxTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(WaterSensor lastElement, long extractedTimestamp) {
maxTs = Math.max(maxTs, extractedTimestamp);
return new Watermark(maxTs);
}
@Override
public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
return element.getTs() * 1000L;
}
}
);
//TODO 使用侧输出流
// 1.定义一个OutputTag,给定一个 名称
// 2.使用 ctx.output(outputTag对象,放入侧输出流的数据)
// 3.获取侧输出流 => DataStream.getSideOutput(outputTag对象)
OutputTag<String> outputTag = new OutputTag<String>("vc alarm") {
};
SingleOutputStreamOperator<WaterSensor> processDS = sensorDS
.keyBy(data -> data.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
/**
* 来一条数据,处理一条
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
if (value.getVc() > 5) {
/*
TODO 侧输出流 水位高于阈值,用侧输出流告警
*/
ctx.output(outputTag, "水位高于阈值5!!!");
}
out.collect(value);
}
}
);
processDS.print();
/*
TODO 侧输出流只保存数据,不对数据进行计算,在此处,我们调用侧输出流中的数据 ===> 进行计算
*/
processDS.getSideOutput(outputTag).print("alarm");
env.execute();
}
}
split算子可以把一个流分成两个流, 从1.12开始已经被移除了. 官方建议我们用侧输出流来替换split算子的功能。
**需求: **
? 采集监控传感器水位值,将水位值高于5cm的值输出到side output
SingleOutputStreamOperator<WaterSensor> result =
env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
.keyBy(ws -> ws.getTs())
.process(new KeyedProcessFunction<Long, WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
out.collect(value);
if (value.getVc() > 5) { //水位大于5的写入到侧输出流
ctx.output(new OutputTag<WaterSensor>("警告") {}, value);
}
}
});
result.print("主流");
result.getSideOutput(new OutputTag<WaterSensor>("警告"){}).print("警告");
我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
Flink提供了8个Process Function:
? ProcessFunction
? KeyedProcessFunction :keyBy分组后使用
? CoProcessFunction :connect 连接后使用
? ProcessJoinFunction
? BroadcastProcessFunction
? KeyedBroadcastProcessFunction
? ProcessWindowFunction :window开窗后使用
? ProcessAllWindowFunction
env
.socketTextStream("hadoop102", 9999)
.map(line -> {
String[] datas = line.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
})
.process(new ProcessFunction<WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
out.collect(value.toString());
}
})
.print();
env
.socketTextStream("hadoop102", 9999)
.map(line -> {
String[] datas = line.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
})
.keyBy(ws -> ws.getId())
.process(new KeyedProcessFunction<String, WaterSensor, String>() { // 泛型1:key的类型 泛型2:输入类型 泛型3:输出类型
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
System.out.println(ctx.getCurrentKey());
out.collect(value.toString());
}
})
.print();
DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs
.process(new CoProcessFunction<Integer, String, String>() {
@Override
public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect(value.toString());
}
@Override
public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
out.collect(value);
}
})
.print();
SingleOutputStreamOperator<WaterSensor> s1 = env
.socketTextStream("hadoop102", 8888) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
SingleOutputStreamOperator<WaterSensor> s2 = env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
s1.join(s2)
.where(WaterSensor::getId)
.equalTo(WaterSensor::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 必须使用窗口
.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {
@Override
public String join(WaterSensor first, WaterSensor second) throws Exception {
return "first: " + first + ", second: " + second;
}
})
.print();
后面专门讲解
keyBy之后使用
添加窗口之后使用
全窗口函数之后使用
基于处理时间或者时间时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行。
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
- currentProcessingTime(): Long 返回当前处理时间
- currentWatermark(): Long 返回当前watermark的时间戳
- registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
- registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
- deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
- deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
public class Flink04_ProcessFunc_KeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 指定时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> waterSensorDS = socketDS
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
.assignTimestampsAndWatermarks(
// new AscendingTimestampExtractor<WaterSensor>() {
// @Override
// public long extractAscendingTimestamp(WaterSensor element) {
// return element.getTs() * 1000L;
// }
// }
// 自定义周期型水位线
new AssignerWithPunctuatedWatermarks<WaterSensor>() {
private Long maxTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(WaterSensor lastElement, long extractedTimestamp) {
maxTs = Math.max(maxTs, extractedTimestamp);
return new Watermark(maxTs);
}
@Override
public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
return element.getTs() * 1000L;
}
}
);
SingleOutputStreamOperator<Long> result = waterSensorDS
.keyBy(r -> r.getId())
.process(new KeyedProcessFunction<String, WaterSensor, Long>() {
private Long triggerTS = 0L;
@Override
public void processElement(WaterSensor value, Context ctx, Collector<Long> out) throws Exception {
// 获取当前数据的分组
ctx.getCurrentKey();
// 获取当前数据的时间戳
ctx.timestamp();
// 将数据放入侧输出流
//ctx.output();
// 定时器 : 注册 、删除
//ctx.timerService().registerEventTimeTimer();
//ctx.timerService().registerProcessingTimeTimer(
// 为了避免重复注册、重复创建对象,注册定时器的时候,判断一下是否已经注册过了
if(triggerTS == 0L){
ctx.timerService().registerEventTimeTimer(
// ctx.timerService().currentProcessingTime() + 5000L //处理时间
value.getTs() * 1000L + 5000L // 事件时间
);
triggerTS = value.getTs() * 1000L + 5000L;
}
}
/**
* 定时器到时候,触发行为
* @param timestamp 注册的定时器的时间
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
// System.out.println(new Timestamp(timestamp) + "定时器触发。。。"); // 处理时间
System.out.println(timestamp + "定时器触发"); // 事件时间
}
});
result.print();
env.execute();
}
}
需求 :
监控水位传感器的水位值,如果水位值在五秒之内(processing time)连续上升,则报警。
public class Flink06_ProcessFunction_TimerPractice {
public static void main(String[] args) throws Exception {
// 0 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 1.env指定时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1.
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
})
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<WaterSensor>() {
private Long maxTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(WaterSensor lastElement, long extractedTimestamp) {
maxTs = Math.max(maxTs, extractedTimestamp);
return new Watermark(maxTs);
}
@Override
public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
return element.getTs() * 1000L;
}
}
);
SingleOutputStreamOperator<String> processDS = sensorDS
.keyBy(data -> data.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
// 定义一个变量,保存上一次的水位值
private Integer lastVC = 0;
private Long triggerTs = 0L;
/**
* 来一条数据,处理一条
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 判断是上升还是下降
if (value.getVc() > lastVC) {
// 1.水位上升
if (triggerTs == 0) {
// 第一条数据来的时候,注册定时器
ctx.timerService().registerEventTimeTimer(value.getTs() * 1000L + 5000L);
triggerTs = value.getTs() * 1000L + 5000L;
}
} else {
// 2.水位下降
// 2.1 删除注册的定时器
ctx.timerService().deleteEventTimeTimer(triggerTs);
// 2.2 重新注册定时器(或 把保存的时间清空)
triggerTs = 0L;
}
// 不管上升还是下降,都要保存水位值,供下条数据使用,进行比较
lastVC = value.getVc();
}
/**
* 定时器触发
* @param timestamp 注册的定时器的时间
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发,说明已经满足 连续5s 水位上升
out.collect(ctx.getCurrentKey() + "在" + timestamp + "监测到水位连续5s上升");
// 将保存的注册时间清空
triggerTs = 0L;
}
}
);
processDS.print();
env.execute();
}
}
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
Flink的状态管理是它的优势之一。状态就是保存的上一次的老数据
各个类型的状态的使用范围:
算子状态:
Source 端和 Sink端。
键控状态:
keyBy之后使用。
在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。
那些需要记住多个事件信息的操作就是有状态的。
在Flink中,状态始终是与特定的算子相关联的。
为了使运行时的Flink了解算子的状态,算子需要预先注册其状态。
总的来说,在Flink中,状态分为两种:
算子状态(Operator State)
算子状态的作用范围限定为算子任务。
键控状态
根据输入数据流中定义的键(key)来维护和访问。
流式计算分为 无状态计算 和 有状态计算 两种情况。
下面的几个场景都需要使用流处理的状态功能 :
去重
数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测
检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合
对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况。
更新机器学习模型
在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。
Flink包括两种基本类型的状态Managed State和Raw State :
Managed State | Raw State | |
---|---|---|
状态管理方式 | Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩 | 用户自己管理 |
状态数据结构 | Flink提供多种常用数据结构, 例如:ListState, MapState等 | 字节数组: byte[] |
使用场景 | 绝大数Flink算子 | 所有算子 |
注意:
对Managed State继续细分,它又有两种类型
- Operator State(算子状态)。
- Keyed State(键控状态)。
Operator State | Keyed State | |
---|---|---|
适用用算子类型 | 可用于所有算子: 常用于source, 例如 FlinkKafkaConsumer | 只适用于KeyedStream上的算子 |
状态分配 | 一个算子的子任务对应一个状态 | 一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State |
创建和访问方式 | 实现CheckpointedFunction或ListCheckpointed(已经过时)接口 | 重写RichFunction, 通过里面的RuntimeContext访问 |
横向扩展 | 并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量 | 并发改变, State随着Key在实例间迁移 |
支持的数据结构 | ListState和BroadCastState | ValueState, ListState,MapState ReduceState, AggregatingState |
Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
注意: 算子子任务之间的状态不能互相访问
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
Flink为算子状态提供三种基本数据结构:
数据结构 | 特点 |
---|---|
列表状态(List state) | 将状态表示为一组数据的列表 |
联合列表状态(Union list state) | 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。 一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。 |
广播状态(Broadcast state) | 是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。 |
在map算子中计算数据的个数
public class Flink01_OperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
String[] datas = lines.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 定义一个有状态的map操作,统计当前分区数据的个数
SingleOutputStreamOperator<Integer> countDS = mapDS.map(new MyCountMapper());
countDS.print();
env.execute();
}
// 自定义MapFunction
// 我们需要把状态保存在检查点中,所以需要实现ListCheckpointed接口
private static class MyCountMapper implements MapFunction<WaterSensor,Integer>, ListCheckpointed<Integer> {
// 定义一个本地变量,作为算子状态
private Integer count = 0;
@Override
public Integer map(WaterSensor value) throws Exception {
count ++;
return count;
}
// 把状态保存到快照中【存储到集合中】
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
// 故障恢复方法
@Override
public void restoreState(List<Integer> state) throws Exception {
for (Integer num : state) {
count +=num;
}
}
}
}
从版本1.5.0开始,Apache Flink具有一种新的状态,称为广播状态。
广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。考虑到上述类型的用例,广播状态与其他操作符状态的区别在于:
键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
数据类型 | 特点 |
---|---|
值状态(Value State) | 将状态表示为单个的值 |
列表状态(List State) | 将状态表示为一组数据的列表 |
映射状态(Map State<UK, UV>) | 将状态表示为一组key-value对 |
聚合状态(ReducingState & AggregatingState) | 将状态表示为一个用于聚合操作的列表 |
具体描述如下:
保存单个值. 每个有key有一个状态值. 设置使用 update(T), 获取使用 T value()
保存元素列表.
添加元素: add(T) addAll(List)
获取元素: Iterable get()
覆盖所有元素: update(List)
存储单个值, 表示把所有元素的聚合结果添加到状态中. 与ListState类似, 但是当使用add(T)的时候ReducingState会使用指定的ReduceFunction进行聚合.
存储单个值. 与ReducingState类似, 都是进行聚合. 不同的是, AggregatingState的聚合的结果和元素类型可以不一样.
存储键值对列表.
添加键值对: put(UK, UV) or putAll(Map<UK, UV>)
根据key获取值: get(UK)
获取所有: entries(), keys() and values()
检测是否为空: isEmpty()
注意:
public class Flink02_KeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
String[] datas = lines.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 按照ID分组
KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(WaterSensor::getId);
// 演示状态的使用
SingleOutputStreamOperator<WaterSensor> process = keyedDS.process(new MyKeyedProcessFunction());
// 打印
process.print();
// 启动执行
env.execute();
}
private static class MyKeyedProcessFunction extends KeyedProcessFunction<String, WaterSensor, WaterSensor> {
// 1. 定义状态
private ValueState<Long> valueState;
private ListState<Long> listState;
private MapState<String,Long> mapState;
private ReducingState<WaterSensor> reducerState;
private AggregatingState<WaterSensor,WaterSensor> aggregatingState;
// 2. 初始化
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state", Long.class));
listState = getRuntimeContext().getListState(new ListStateDescriptor<Long>("list-state", Long.class));
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("map-state", String.class, Long.class));
// reducerState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<WaterSensor>("reduce-state", , WaterSensor.class));
}
/*
3. 状态的操作
- 增
- 删
- 改
- 查
*/
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
// 3.1 value状态
Long value1 = valueState.value();// 获取状态中的值
valueState.update(122L); // 更新状态的值
valueState.clear(); // 清空状态值到原始状态
// 3.2 listState状态
Iterable<Long> longs = listState.get(); // 获取状态中的值
listState.add(122L); // 添加状态值
listState.update(new ArrayList<>()); // 替换整个的状态的值
listState.clear(); // 清空状态值到原始状态
// 3.3 mapState状态
Iterator<Map.Entry<String, Long>> it = mapState.iterator(); // 获取所有的状态【然后迭代器进行迭代】
Long aLong = mapState.get(""); // 获取指定的某个key的状态
// ....
// 3.4 reduceState状态
WaterSensor waterSensor = reducerState.get(); // 获取状态中的值
reducerState.add(new WaterSensor()); // 添加状态值
reducerState.clear(); // 清空状态值到原始状态
// 3.5 aggregateState状态
WaterSensor waterSensor1 = aggregatingState.get();// 获取状态中的值
aggregatingState.add(new WaterSensor()); // 添加状态值
aggregatingState.clear(); // 清空状态值到原始状态
}
}
}
检测传感器的水位线值,如果连续的两个水位线差值超过10,就输出报警。
public class Flink03_State_ValueState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
String[] datas = lines.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 按照ID分组
KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(r -> r.getId());
// 使用RichFunction实现水位线跳变报警需求
SingleOutputStreamOperator<String> stateDS = keyedDS.flatMap(new RichFlatMapFunction<WaterSensor, String>() {
// 定义状态
private ValueState<Integer> vcState;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化
vcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("vc-state", Integer.class));
}
@Override
public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
// 取出状态中的值
Integer lastVc = vcState.value();
// 更新状态值
vcState.update(value.getVc());
// 当上一次的水位线不为NULL,并且出现水位跳变的时候进行报警
if (lastVc != null && Math.abs(lastVc - value.getVc()) > 10) {
out.collect("报警!!!");
}
}
});
stateDS.print();
// 打印
// 启动执行
env.execute();
}
}
针对每个传感器输出最高的3个水位值
public class Flink04_State_ListState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
String[] datas = lines.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 按照ID分组
KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(r -> r.getId());
keyedDS.map(new RichMapFunction<WaterSensor, List<WaterSensor>>() {
// 定义状态
private ListState<WaterSensor> top3State;
// 初始化
@Override
public void open(Configuration parameters) throws Exception {
top3State = getRuntimeContext().getListState(new ListStateDescriptor<WaterSensor>("list-state", WaterSensor.class));
}
@Override
public List<WaterSensor> map(WaterSensor value) throws Exception {
// 将当前数据加入状态
top3State.add(value);
// 取出状态中的数据并且排序【倒序】
ArrayList<WaterSensor> waterSensors = Lists.newArrayList(top3State.get().iterator());
waterSensors.sort((o1,o2) -> o2.getVc() - o1.getVc());
// 判断档期数据是否超过3条,如果超过,则删除最后一条
if(waterSensors.size() > 3){
waterSensors.remove(3);
}
// 更新状态
top3State.update(waterSensors);
// 返回数据
return waterSensors;
}
}).print();
// 打印
// 启动执行
env.execute();
}
}
计算每个传感器的水位之和(分组)
public class Flink05_State_ReducingState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
String[] datas = lines.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
// 按照ID分组
KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(r -> r.getId());
// 逻辑处理
keyedDS.process(new KeyedProcessFunction<String,WaterSensor,WaterSensor>() {
// 定义状态
private ReducingState<WaterSensor> reducingState;
// 初始化
@Override
public void open(Configuration parameters) throws Exception {
reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<WaterSensor>(
"reducing-state", new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
}, WaterSensor.class));
}
//
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
// 将当前数据聚合到状态中
reducingState.add(value);
// 取出状态中的数据
WaterSensor waterSensor = reducingState.get();
// 输出数据
out.collect(waterSensor);
}
}).print();
// 打印
// 启动执行
env.execute();
}
}
计算每个传感器的平均水位
.process(new KeyedProcessFunction<String, WaterSensor, Double>() {
private AggregatingState<Integer, Double> avgState;
@Override
public void open(Configuration parameters) throws Exception {
AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> aggregatingStateDescriptor = new AggregatingStateDescriptor<>("avgState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f0 * 1D / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}, Types.TUPLE(Types.INT, Types.INT));
avgState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<Double> out) throws Exception {
avgState.add(value.getVc());
out.collect(avgState.get());
}
})
去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意
.process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
private MapState<Integer, String> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = this
.getRuntimeContext()
.getMapState(new MapStateDescriptor<Integer, String>("mapState", Integer.class, String.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
if (!mapState.contains(value.getVc())) {
out.collect(value);
mapState.put(value.getVc(), "随意");
}
}
})
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
- 本地的状态管理
- 将检查点(checkpoint)状态写入远程存储
将状态保存到何处,配置状态保存的位置。结合checkPoint检查点一起使用。
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
Flink提供了3中状态后端:
- MemoryStateBackend ——>
- FsStateBackend
- RocksDBStateBackend
内存级别的状态后端,
存储方式:本地状态存储在JobManager的内存中, checkpoint 存储在JobManager的内存中。
特点:快速, 低延迟, 但不稳定
使用场景:
1. 本地测试
2. 几乎无状态的作业(ETL)
3. JobManager不容易挂, 或者挂了影响不大.
4. 不推荐在生产环境下使用
存储方式: 本地状态在JobManager内存, Checkpoint存储在文件系统中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景:
1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等
2. 需要开启HA的作业
3. 可以应用在生产环境中
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式:
1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘)
2. Checkpoint在外部文件系统中.
使用场景:
1. 超大状态的作业, 例如天级的窗口聚合
2. 需要开启HA的作业
3. 对读写状态性能要求不高的作业
4. 可以使用在生产环境
在flink-conf.yaml文件中设置默认的全局后端
![](https://img-blog.csdnimg.cn/img_convert/ba3e7fed930899b43e1ceb09a60c4447.png#align=left&display=inline&height=189&margin=[object Object]&originHeight=189&originWidth=1269&status=done&style=none&width=1269)
可以在代码中单独为这个Job设置状态后端;
如何要使用RocksDBBackend, 需要先引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
// 定义状态后端,保存状态的位置
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));
// 开启CheckPoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
当在分布式系统中引入状态时,自然也引入了一致性问题。
一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
在流处理中,一致性可以分为3个级别:
**at-most-once(最多变一次): **
这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
**at-least-once(至少一次): **
这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
exactly-once(严格变一次):
这指的是系统保证在发生故障后得到的计数结果与正确值一致.既不多算也不少算。
曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二:
保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性
流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。
最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。
Flink的一个重大价值在于,它既保证了exactly-once,又具有低延迟和高吞吐的处理能力。
从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。
? 目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
? 端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。
具体划分如下:
需要外部源可重设数据的读取位置.
目前我们使用的Kafka Source具有这种特性: 读取数据的时候可以指定offset
依赖checkpoint机制
需要保证从故障恢复时,数据不会重复写入外部系统或丢失数据。
有2种实现形式:
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
概念:
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。
理解:
相当于是状态的备份,用来存储状态的终端。假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。
![](https://img-blog.csdnimg.cn/img_convert/6570c6aebd73a5e7bee4e76c3c465964.png#align=left&display=inline&height=517&margin=[object Object]&originHeight=517&originWidth=760&status=done&style=none&width=760)
于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.
快照的实现算法:
简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用
Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous barrier snapshotting)
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.
每个barrier会把数据流分成两部分: 一部分数据进入**当前的快照** , 另一部分数据进入**下一个快照 ****.** 每个barrier携带着快照的id. barrier 不会暂停数据的流动, 所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.
![](https://img-blog.csdnimg.cn/img_convert/88ec309466654b8b8ff8b9bad5dfadf6.png#align=left&display=inline&height=402&margin=[object Object]&originHeight=402&originWidth=1048&status=done&style=none&width=1048)
![](https://img-blog.csdnimg.cn/img_convert/87c4ad4bdaa70d8dab6cbfbcda54a42c.png#align=left&display=inline&height=567&margin=[object Object]&originHeight=567&originWidth=1761&status=done&style=none&width=1761)
![](https://img-blog.csdnimg.cn/img_convert/9d1e199592be3f5afbd7ed4f07263579.png#align=left&display=inline&height=635&margin=[object Object]&originHeight=635&originWidth=1574&status=done&style=none&width=1574)
![](https://img-blog.csdnimg.cn/img_convert/db5acd4f9aa5160a9c99843111ee58ab.png#align=left&display=inline&height=659&margin=[object Object]&originHeight=659&originWidth=1516&status=done&style=none&width=1516)
![](https://img-blog.csdnimg.cn/img_convert/6f4050ba2efbea2358d4e38ea1d7bace.png#align=left&display=inline&height=664&margin=[object Object]&originHeight=664&originWidth=1504&status=done&style=none&width=1504)
![](https://img-blog.csdnimg.cn/img_convert/def4c56ecf3794f1588454188eb9cb8e.png#align=left&display=inline&height=662&margin=[object Object]&originHeight=662&originWidth=1509&status=done&style=none&width=1509)
在多并行度下, 如果要实现严格一次, 则要执行**barrier对齐**。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行** barrier 对齐**(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/fig/stream_aligning.svg
![](https://img-blog.csdnimg.cn/img_convert/f408e2d97ceafabb980ba6b008cdeb2b.png#align=left&display=inline&height=282&margin=[object Object]&originHeight=282&originWidth=1269&status=done&style=none&width=1269)
前面介绍了barrier对齐, 如果barrier不对齐会怎么样? 会重复消费, 就是**至少一次**语义.
![](https://img-blog.csdnimg.cn/img_convert/9f169abfcd2b5d1db0349842601b2172.png#align=left&display=inline&height=282&margin=[object Object]&originHeight=282&originWidth=1269&status=done&style=none&width=1269)
假设不对齐, 在字母流的Checkpoint barrier n到达前, 已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算. 1 2 3 会被再次被计算, 所以123出现了重复计算.
Savepoint | Checkpoint |
---|---|
Savepoint是由命令触发, 由用户创建和删除 | Checkpoint被保存在用户指定的外部路径中 |
保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。 | 当作业失败或被取消时,将保留外部存储的检查点。 |
用户必须提供用于还原作业状态的保存点的路径。 | 用户必须提供用于还原作业状态的检查点的路径。 |
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?
内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性
source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。
具体的两阶段提交步骤总结如下:
public class BaseDBApp {
public static void main(String[] args) {
// TODO 1. 基本环境准别,获取流执行环境、设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);
// TODO 2. 设置检查点
// 2.1 开启检查点
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
// 2.2 设置检查点失效时间
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 2.3 设置job任务取消时,保留检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.4 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/ck"));
// 2.5 设置检查点的重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
// 2.6 设置操作hdfs的用户
System.setProperty("HADOOP_USER_NAME", "atguigu");
}
}
package com.atguigu.flink.java.chapter_7.state;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class Flink04_State_Checkpoint {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "atguigu");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
properties.setProperty("group.id", "Flink01_Source_Kafka");
properties.setProperty("auto.offset.reset", "latest");
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration())
.setParallelism(3);
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 开启在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env
.addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties))
.map(value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
})
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
Integer lastVc = state.value() == null ? 0 : state.value();
if (Math.abs(value.getVc() - lastVc) >= 10) {
out.collect(value.getId() + " 红色警报!!!");
}
state.update(value.getVc());
}
})
.addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "alert", new SimpleStringSchema()));
env.execute();
}
}
从SavePoint和CK恢复任务
//启动任务
bin/flink -c com.atguigu.WordCount xxx.jar
//保存点(只能手动)
bin/flink savepoint -m hadoop102:8081 JobId hdfs://hadoop102:8020/flink/save
//关闭任务并从保存点恢复任务
bin/flink -s hdfs://hadoop102:8020/flink/save/... -m hadoop102:8081 -c com.atguigu.WordCount xxx.jar
//从CK位置恢复数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bin/flink run -s hdfs://hadoop102:8020/flink/ck/Jobid/chk-960 -m hadoop102:8081 -c com.atguigu.WordCount xxx.jar
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 | -2024/5/21 3:43:04- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |