IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink随笔 -> 正文阅读

[大数据]Flink随笔

一、Flink简介

1.1 初识Flink

1.1.1 序言

? Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。
? 在德语中,Flink一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而Flink的松鼠logo拥有可爱的尾巴,尾巴的颜色与Apache软件基金会的logo颜色相呼应,也就是说,这是一只Apache风格的松鼠。

Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

重点去理解:

① :有界和无界(当前的计算是流处理还是批处理)

② :有状态(当前的设计是否被操作,状态保存到累机器中)

1.1.2 计算引擎的发展史

计算引擎迄今为止,一共经历了4代 :

image-20210609130841126

1.2 Flink的重要特点

1.2.1 事件驱动型(Event-driven)

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以kafka为代表的消息队列几乎都是事件驱动型应用。(Flink的计算也是事件驱动型)

  • 与之不同的就是SparkStreaming微批次,如图:

  • 事件驱动型

1.2.2 流与批的世界观

批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

  1. 无界数据流:
    无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
  2. 有界数据流:
    有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理

1.2.3 分层API

20210607114422

? 1 : 最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。底层过程函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
? 2 : 实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
? 3 : Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何。
? 4 : 尽管Table API可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会经过内置优化器进行优化。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API与DataStream 以及 DataSet 混合使用。
? 5 : Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
? 目前Flink作为批处理还不是主流,不如Spark成熟,所以DataSet使用的并不是很多。Flink Table API和Flink SQL也并不完善,大多都由各大厂商自己定制。所以我们主要学习DataStream API的使用。实际上Flink作为最接近Google DataFlow模型的实现,是流批统一的观点,所以基本上使用DataStream就可以了。
? 2020年12月8日发布的最新版本1.12.0, 已经完成实现了真正的**流批一体. **写好的一套代码, 即可以处理流失数据, 也可以处理离线数据. 这个与前面版本的处理有界流的方式是不一样的, Flink专门对批处理数据做了优化处理。

1.3 Spark or Flink

Flink和Spark的技术选择上的主要出发点就是:计算模型的选择!!!!!!!!!!!!!!!!!

**1 :**Spark 和 Flink 一开始都拥有着同一个梦想,他们都希望能够用同一个技术把流处理和批处理统一起来,但他们走了完全不一样的两条路前者是以批处理的技术为根本,并尝试在批处理之上支持流计算;后者则认为流计算技术是最基本的,在流计算的基础之上支持批处理。正因为这种架构上的不同,今后二者在能做的事情上会有一些细微的区别。比如在低延迟场景,Spark 基于微批处理的方式需要同步会有额外开销,因此无法在延迟上做到极致。在大数据处理的低延迟场景,Flink 已经有非常大的优势。

Spark无法实现的方面体现在以下方面:

  • 保证数据精准一次性处理(Exactly-once)【需要开发者自己开发程序保证】
  • 乱序数据,迟到数据
  • 低延迟、高吞吐、准确性
  • 容错性

2 :Spark和Flink的主要差别就在于计算模型不同。Spark采用了微批处理模型,而Flink采用了基于操作符的连续流模型。因此,对Apache Spark和Apache Flink的选择实际上变成了计算模型的选择,而这种选择需要在延迟、吞吐量和可靠性等多个方面进行权衡。

**3 :如果企业中非要技术选型从Spark和Flink这两个主流框架中选择一个来进行流数据处理,我们推荐使用Flink,**主(显而)要(易见)的原因为:

  • Flink灵活的窗口

  • Exactly Once

    语义保证这两个原因可以大大的解放程序员, 加快编程效率, 把本来需要程序员花大力气手动完成的工作交给框架完成,棒棒哒,点赞!!!

1.3.1 两者的区别总结:

  1. 处理方式方面:

    • Flink : 持续流方式
    • Spark : 批处理(微批次方式)
  2. 延迟效果:

    • Flink : 低延迟(实时)
    • Spark : 高延迟(离线)

1.3.2 常见问题:

  1. Spark的计算速度为什么比MR快?

    ①:基于内存进行计算操作

    ②:提供DAG(有向无环图)

  2. Flink的计算速度为什么比Spark快?

    ?

二、Flink快速上手

WordCount的基本流程:

  1. 获取执行环境

  2. 获取数据源

  3. 处理数据 —> 转换算子

  • 扁平化操作:切分,转换为二元组flatMap(),实现FlatMapFunction类。

  • 进行分组(参数:下标(word,1)【下标为“0”也就是根据word进行分组】)

    (word,1)|(word,1)|(word,1)|(word,1) ===> 为一组

    • 在批处理中分组的 API 是groupBy()方法
    • 在流处理中分组的 API 是keyBy()方法
  • 按照分组进行聚合操作 (参数:下标(word,1)【下标为“1”也就是对1进行聚合累加操作】)===> (word,4)

  1. 输出、保存…

  2. 启动:执行处理程序(批处理不需要执行这一步)execute()方法

2.1 创建maven项目

  • POM文件中添加需要的依赖:
<!--properties标签的作用:统一管理jar包的版本号-->
<properties>
    <flink.version>1.12.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>
<!-- 引入jar包 -->
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>
<!-- 引入打包插件 -->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • src/main/resources添加文件:log4j.properties
log4j.rootLogger=error, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.2 批处理WordCount

准备数据:

hello java
hello scala
hello spark
hello flink

代码示例:

package com.heather.chapter02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class Batch_WordCount {

    public static void main(String[] args) throws Exception {

        // 1. 获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 获取数据源
        DataSource<String> fileDS = env.readTextFile("input/hello.txt");
        // 3. 处理数据 - 操作算子
        // 3.1 扁平化操作:切分、转换为元组 ===> (word,1)
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOneDS = fileDS.flatMap(new MyFlatMapFunction());
        // 3.2 进行分组(参数:下标(word,1)【下标为“0”也就是根据word进行分组】)
        // (word,1)|(word,1)|(word,1)|(word,1) ===> 为一组
        // 在流处理中分组的 API groupBy()方法
        UnsortedGrouping<Tuple2<String, Integer>> wordToGroupDS = wordAndOneDS.groupBy(0);
        // 3.3 按照分组进行聚合操作 (参数:下标(word,1)【下标为“1”也就是对1进行聚合累加操作】)
        // ===> (word,4)
        AggregateOperator<Tuple2<String, Integer>> result = wordToGroupDS.sum(1);
        // 4. 输出、保存 ....
        result.print();
        // 5. 启动:执行处理程序(批处理不需要执行这一步)
    }

    /*
        自定义一个类实现FlatMapFunction接口,接口中有两个参数:
            参数1:输入的参数的类型:—>hello     ==> String类型
            参数2:输出的参数的类型:—>(hello,1) ==> 二元祖的类型
     */
    public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {
        /*
            flatMap()方法:
                1. 切分数据
                2. 把数据封装为元组
                3. 使用采集器向下游发送数据
         */
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            // 1. 切分数据
            String[] wordArr = value.split(" ");
            for (String word : wordArr) {
                // 2. 把数据封装为元组
                /*
                    创建二元组有两种方式:
                        1. Tuple2<T,E> tuple = new Tuple2<>(value1,value2);
                        2. Tuple2<T,E> tuple = Tuple2.of(value1,value2);
                 */
                Tuple2<String, Integer> tuple = new Tuple2<>(word, 1);
                // 3. 使用采集器向下游发送数据
                out.collect(tuple);
            }
        }
    }
}

2.3 流处理WordCount

有界流和无界流的区别:

  • 有界流在读取完数据后,程序自行停止;

  • 无界流的程序一单启动后是不会停止的,除非数据源停止输入数据。

2.3.1 有界流

第一种方式:原生方式

public class Bound_WordCount {

    public static void main(String[] args) throws Exception {

        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 获取数据源
        DataStreamSource<String> fileDS = env.readTextFile("input/hello.txt");
        // 3. 处理数据 —> 转换算子
        // 3.1 扁平化操作:切分,转换为二元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = fileDS.flatMap(new MyFlatMapFunction());
        // 3.2 进行分组(参数:下标(word,1)【下标为“0”也就是根据word进行分组】)
        // (word,1)|(word,1)|(word,1)|(word,1) ===> 为一组
        // 在流处理中分组的 API 是keyBy()方法
        KeyedStream<Tuple2<String, Integer>, Tuple> wordToGroupDS = wordAndOneDS.keyBy(0);
        // 3.3 按照分组进行聚合操作 (参数:下标(word,1)【下标为“1”也就是对1进行聚合累加操作】)
        // ===> (word,4)
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordToGroupDS.sum(1);
        // 4. 输出、保存...
        result.print();
        // 5. 启动:执行处理程序
        env.execute();

    }

    public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>>{
        /*
            flatMap()方法的作用:
                1. 切分数据
                2. 把数据转换为二元组
                3. 使用采集器向下游发送数据

        * */
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            //1. 切分数据
            String[] wordArr = value.split(" ");
            for (String word : wordArr) {
                //2. 把数据转换为二元组
                /*
                    创建二元组有两种方式:
                        1. Tuple2<T,E> tuple = new Tuple2<>(value1,value2);
                        2. Tuple2<T,E> tuple = Tuple2.of(value1,value2);
                 */
                Tuple2<String, Integer> tuple = Tuple2.of(word, 1);
                //3. 使用采集器向下游发送数据
                out.collect(tuple);
            }
        }
    }
}

第二种方式:Lambda表达式

当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息

public class Lambda_Bound_WordCount {

    public static void main(String[] args) throws Exception {

        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 获取数据源
        DataStreamSource<String> fileDS = env.readTextFile("input/hello.txt");
        // 3. 处理数据 —> 转换算子
        // 3.1 扁平化操作:切分,转换为二元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = fileDS.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
            //1. 切分数据
            String[] wordArr = value.split(" ");
            for (String word : wordArr) {
                //2. 把数据转换为二元组
            /*
                创建二元组有两种方式:
                    1. Tuple2<T,E> tuple = new Tuple2<>(value1,value2);
                    2. Tuple2<T,E> tuple = Tuple2.of(value1,value2);
             */
                Tuple2<String, Integer> tuple = Tuple2.of(word, 1);
                //3. 使用采集器向下游发送数据
                out.collect(tuple);
            }
        })       
                //当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息,有以下两种解决方案
                //.returns(new TypeHint<Tuple2<String, Integer>>() {});
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 3.2 进行分组(参数:下标(word,1)【下标为“0”也就是根据word进行分组】)
        // (word,1)|(word,1)|(word,1)|(word,1) ===> 为一组
        // 在流处理中分组的 API 是keyBy()方法
        KeyedStream<Tuple2<String, Integer>, Tuple> wordToGroupDS = wordAndOneDS.keyBy(0);
        // 3.3 按照分组进行聚合操作 (参数:下标(word,1)【下标为“1”也就是对1进行聚合累加操作】)
        // ===> (word,4)
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordToGroupDS.sum(1);
        // 4. 输出、保存...
        result.print();
        // 5. 启动:执行处理程序
        env.execute();
    }
}

2.3.2 无界流

public class UnBound_WordCount {

    public static void main(String[] args) throws Exception {

        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 获取数据源
        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        // 3. 处理数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = socketDS.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (in, out) -> {
            String[] wordArr = in.split(" ");
            for (String word : wordArr) {
                Tuple2<String, Integer> tuple = Tuple2.of(word, 1);
                out.collect(tuple);
            }
        }).returns(new TypeHint<Tuple2<String, Integer>>() {
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> wordToGroupDS = wordAndOneDS.keyBy(0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordToGroupDS.sum(1);

        // 4. 输出、保存
        result.print();
        // 5. 启动程序
        env.execute();

    }
}

三、Flink部署

3.1 开发模式

Flink中的Local-cluster(本地集群)模式,主要用于测试, 学习。

3.2 local-cluster模式

Flink中的Local-cluster(本地集群)模式,主要用于测试, 学习。

3.2.1 local-cluster模式配置

local-cluster模式基本属于零配置

  1. 上传Flink的安装包flink-1.12.0-bin-scala_2.11.tgz到hadoop102

  2. 解压

    tar -zxvf flink-1.12.0-bin-scala_2.11.tgz -C /opt/module

  3. 进入目录/opt/module, 复制flink-local

    cd /opt/module
    cp -r flink-1.12.0 flink-local

3.2.2 在local-cluster模式下运行无界的WordCount

  1. 打包idea中的应用

  2. 把不带依赖的jar包上传到目录/opt/module/flink-local下

  3. 启动本地集群

    bin/start-cluster.sh

  4. 在hadoop102中启动netcat

    nc -lk 9999
    注意: 如果没有安装netcat需要先安装:
    sudo yum install -y nc

  5. 命令行提交Flink应用

    bin/flink run -m hadoop102:8081 -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

  6. 在浏览器中查看应用执行情况

    http://hadoop102:8081
    ![](https://img-blog.csdnimg.cn/img_convert/a8fdd71d570ee4dee6bc64a42e524fc7.png#align=left&display=inline&height=566&margin=[object Object]&originHeight=566&originWidth=1207&status=done&style=none&width=1207)

  7. 也可以在log日志查看执行结果

    cat flink-atguigu-taskexecutor-0-hadoop102.out

  8. 也可以在WEB UI提交应用


3.3 Standalone模式

Standalone模式又叫独立集群模式。

3.3.1 Standalone模式配置

  1. 复制flink-standalone

    cp -r flink-1.12.0 flink-standalone

  2. 修改配置文件:flink-conf.yaml

    jobmanager.rpc.address: hadoop102

  3. 修改配置文件:workers

    hadoop102 hadoop103 hadoop104

  4. 分发flink-standalone到其他节点

3.3.2 Standalone模式运行无界流WorkCount

  1. 启动standalone集群

    bin/start-cluster.sh

  2. 命令行提交Flink应用

    bin/flink run -m hadoop102:8081 -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

  3. 查看执行情况和本地集群一致.

  4. 支持Web UI界面提交Flink应用

3.3.3 Standalone高可用(HA)

任何时候都有一个 主 JobManager 和多个备用 JobManagers,以便在主节点失败时有备用 JobManagers 来接管集群。这保证了没有单点故障,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager 都可以充当主备节点。

  1. 修改配置文件: flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:8020/flink/standalone/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_atguigu
  1. 修改配置文件: masters

    hadoop102:8081 hadoop103:8081

  2. 分发修改的后配置文件到其他节点

  3. 在/etc/profile.d/my.sh中配置环境变量

    export HADOOP_CLASSPATH=hadoop classpath
    **注意: **

  • 需要提前保证HAOOP_HOME环境变量配置成功
  • 分发到其他节点
  1. 首先启动dfs集群和zookeeper集群

  2. 启动standalone HA集群

    bin/start-cluster.sh

  3. 可以分别访问

    http://hadoop102:8081 http://hadoop103:8081

  4. 在zkCli.sh中查看谁是leader

    get /flink-standalone/cluster_atguigu/leader/rest_server_lock

    杀死hadoop102上的Jobmanager, 再看leader

    **注意: **不管是不是leader从WEB UI上看不到区别, 并且都可以与之提交应用.

3.4 Yarn模式

独立部署(Standalone)模式由Flink自身提供计算资源,无需其他框架提供资源,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Flink主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成更靠谱,所以接下来我们来学习在强大的Yarn环境中Flink是如何使用的。(其实是因为在国内工作中,Yarn使用的非常多)
把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会申请容器从Yarn的NodeManager上面. Flink会创建JobManager和TaskManager在这些容器上.Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源

3.4.1 Yarn模式配置

  1. 复制flink-yarn

    cp -r flink-1.12.0 flink-yarn

  2. 配置环境变量HADOOP_CLASSPATH, 如果前面已经配置可以忽略。

    在/etc/profile.d/my.sh中配置
    export HADOOP_CLASSPATH=hadoop classpath

3.4.2 Yarn运行无界流WordCount

  1. 启动hadoop集群(hdfs, yarn)

  2. 运行无界流

    bin/flink run -t yarn-per-job -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

  3. 在yarn的ResourceManager界面查看执行情况


3.4.3 Flink on Yarn的3种部署模式

Flink提供了yarn上运行的3模式,分别为**Session-Cluster,Application Mode**和**Per-Job-Cluster**模式。

3.4.3.1 Session-Cluster


Session-Cluster模式需要先启动Flink集群,向Yarn申请资源, 资源申请到以后,永远保持不变。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。
在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.
缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.
所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job.

3.4.3.2 Per-Job-Cluster


一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

3.4.3.3 Application Mode

Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.
与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数式在集群中执行的
官方建议:
出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

3.4.4 Per-Job-Cluster模式执行无界流WordCount

bin/flink run -d -t yarn-per-job -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

提交任务到Yarn的其他队列
bin/flink run -d -m yarn-application -yqu hive -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

bin/flink run -d -t yarn-per-job -Dyarn.application.queue hive -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

3.4.5 Session-Cluster模式执行无界流WordCount

  1. 启动一个Flink-Session

    bin/yarn-session.sh -d

  2. 在Session上运行Job

    bin/flink run -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
    会自动找到你的yarn-session启动的Flink集群.也可以手动指定你的yarn-session集群:
    bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY ./flink-prepare-1.0-SNAPSHOT.jar
    **注意: **application_XXXX_YY 指的是在yarn上启动的yarn应用

3.4.6 Application Mode模式执行无界流WordCount

? bin/flink run-application -t yarn-application -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar
提交任务到Yarn的其他队列
? bin/flink run-application -t yarn-application -Dyarn.application.queue hive -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

3.4.7 Yarn模式高可用

Yarn模式的高可用和Standalone模式的高可用原理不一样。
Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby的, 当leader挂了, 其他的才会有一个成为leader。
yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用.
  1. 在yarn-site.xml中配置
<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>

**注意: **配置完不要忘记分发, 和重启yarn

  1. 在flink-conf.yaml中配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn
  1. 启动yarn-session
  2. 杀死Jobmanager, 查看的他的复活情况

注意: yarn-site.xml中是它活的次数的上限, flink-conf.xml中的次数应该小于这个值。

3.5 Scala REPL

scala 交互环境。

  1. local模式启动 REPL

    /opt/module/flink-local ? bin/start-scala-shell.sh local

  2. yarn-session 模式启动

    先启动一个yarn-session, 然后就可以把shell跑在yarn-session上了
    bin/start-scala-shell.sh yarn

3.6 K8S & Mesos模式

Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核,在Twitter得到广泛使用,管理着Twitter超过30,0000台服务器上的应用部署,但是在国内,依然使用着传统的Hadoop大数据框架,所以国内使用mesos框架的并不多,这里我们就不做过多讲解了。
容器化部署时目前业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。这里我们也不做过多的讲解。

3.7 Windows模式

在学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程,并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度,Flink提供了可以在windows系统下启动本地集群的方式,这样,在不使用虚拟机的情况下,也能学习flink的基本使用,所以后续中,为了能够给同学们更加流畅的教学效果和教学体验,我们一般情况下都会采用windows系统的集群来学习flink。

Windows系统搭建Flink集群具体如下:

  1. 将文件flink-1.10.0-bin-scala_2.11.tgz解压缩到无中文无空格的路径中

  2. 修改conf/flink-conf.yaml文件,添加配置

    注意:行头部不允许有空格,因为yaml文件是以空格区分上下级的!!!

    taskmanager.cpu.cores: 1.8
    taskmanager.memory.task.heap.size: 2048m
    taskmanager.memory.managed.size: 2048m
    taskmanager.memory.network.fraction: 0.1
    taskmanager.memory.network.min: 64mb
    taskmanager.memory.network.max: 64mb
    
  3. 执行脚本:bin/start-cluster.bat,启动两个进程

  4. 在浏览器端输入:localhost:8081即可访问。

四、Flink运行架构

4.1 运行架构

https://ci.apache.org/projects/flink/flink-docs-release-1.11/fig/processes.svg(图片来源地址)

4.1.1 官网图解:

4.1.2 中文图解:

img

Flink运行时包含2种进程:1个JobManager和至少1个TaskManager

4.2 核心组件

4.2.0 Flink VS Spark核心组件

框架SparkFlink
DriverJobManager
ExecutorTaskManager
coreslots

4.2.1 Application Master

这个进程包含3个不同的组件 : Dispatcher、ResourceManager、JobManager

4.2.1.1 Dispatcher

负责接收用户提供的作业,并且负责为这个新提交的作业启动一个新的JobManager 组件

4.2.1.2 ResourceManager

  1. 负责资源的管理,在整个 Flink 集群中只有一个 ResourceManager。(管理多个JobManager)
  2. 注意这个ResourceManager不是Yarn中的ResourceManager, 是Flink中内置的, 只是赶巧重名了而已。
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。

4.2.1.2 JobManager

负责管理作业的执行,在一个Flink集群中可能有多个作业同时执行,每个作业都有自己的JobManager组件。
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。

JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。

4.2.1.2 JobMaster

JobMaster负责管理单个JobGraph的执行。多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster,是JobManager的物理单元。

4.2.2 TaskManager

Flink中的工作进程(JobManager)。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。

4.2.3 Cluster Manager

集群管理器,比如Standalone、YARN、K8s等,就是前面我们学习的不同环境。

4.2.4 Client

提交Job的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交Job后,Client可以结束进程(Streaming的任务),也可以不结束并等待结果返回。

4.3 核心概念

4.3.1 TaskManager与Slots

Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)
这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的(空转),但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到Flink可以将这个空闲的Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。

4.3.2 Parallelism(并行度)

并行度设置的优先级:

算子(代码) > 全局(代码) > 提交参数 > 配置文件

Parallelism 和 Slots之间的关系 :

  1. slots表示此任务可以承担的最大的线程数量,是静态的。
  2. prallelism表示任务运行时实际开启的线程数量,是动态的。(Parallelism <= slots)
  3. 并行度的计算推理原理:
  • Flink的并行度可以对算子进行设置,那么算子的子任务的数量,就是算子的并行度。
  • Job的并行度 => 并行度最大的算子的并行度。
  • 一个Job需要多少并行度 ? 取决于 Job 的并行度。
在学习Spark RDD时,无论是读取内存中的数据,或读取文件数据,都会接触一个叫并行度的概念,并且在RDD的算子中也可以动态改变并行度,通过学习,咱们应该知道Spark中的并行度最终体现为分区,而分区又意味着Task。所以Spark 计算中Task的数量是可以通过并行度推算出来的。这个大家没有的问题的话,那就好办了,为什么?因为Flink的并行度的作用和Spark中并行度的作用的一样的。最后都可以表现为任务的并行执行。

虽然Spark中的并行度和Flink的并行度的原理,作用基本一致,但是由于模型选择的问题,所以使用上依然有些细微的区别:
	? Spark的并行度设置后需要调用特殊的算子(repartition)或特殊的操作(shuffle)才能进行改变,比如调用			flatMap算子后再调用repartition改变分区。
	? Flink的并行度设置可以在任何算子后使用,并且为了方便,也可以设置全局并行度
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(2);
	
但是需要注意,某些数据源数据的采集是无法改变并行度,如Socket
	? 如果Flink的一个算子的并行度为2,那么这个算子在执行时,这个算子对应的task就会拆分成2个subtask,发到不同的Slot中执行。

一个特定算子的子任务(subtask)的个数被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。并行度是单指一个任务中的,而不是整个程序的。

4.3.3 算子与算子之间的关系

Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。

4.3.3.1 One-to-one

类似于spark中的窄依赖

stream(比如在source和flatMap operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。

image-20210609234053618

4.3.3.2 Redistributing

类似于spark中的宽依赖

stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。

例如,keyBy()基于**hashCode重分区(类似于Spark中Shuffle)、broadcast(广播)和rebalance(重分配)**会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。

  1. hashCode重分区

image-20210609234723019

  1. broadcast广播

image-20210609235101600

  1. rebalance

image-20210609235605402

4.3.4 Task与SubTask

算子的一个并行子任务,叫做subtask。

**task :**是由①不同算子的subtask(子任务)②根据一定的规则③合并在一起形成。

不同算子组成一个 task 的条件:详情请查看下一章节 [4.3.5 Operator Chains(任务链)](# 4.3.5 Operator Chains(任务链))

①:并行度一致

②:两个算子之间是one-to-one的关系

Flink执行时,由于并行度的设置,可以将同一个Job不同算子的subtask(子任务)放在同一块内存中进行处理,那么这样在执行时就可以合并成一个完整的task进行处理,而不是独立的子任务,这样就减少了子任务(SubTask)之间调度和数据传递的性能损耗,避免了跨节点进行数据传输

3

4.3.5 Operator Chains(任务链)

把相同并行度one to one操作,Flink将这样相连的算子链接在一起形成一个 task ,原来的算子成为里面的一部分。 每个task被一个线程执行.
将算子链接成task是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。

image-20210610003549241

阻止任务链的常用方法:

使用下面的方法后,哪怕两个算子之间是one to one的关系,并且并行度一致,也不会形成任务链。

执行环境对象:disableOperatorChaining()方法

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();

算子:startNewChain()方法 disableChaining()方法

算子.startNewChain();
算子.disableChaining();

4.3.6 ExecutionGraph(执行图)

由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> Physical Graph。

2个并发度(Source为1个并发度)的 SocketTextStreamWordCount 四层执行图的演变过程 env.socketTextStream().flatMap(…).keyBy(0).sum(1).print();

  • StreamGraph:

是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构

  • JobGraph

StreamGraph经过优化后生成了 JobGraph,是提交给 JobManager 的数据结构。主要的优化为: 将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

  • ExecutionGraph

JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

  • Physical Graph:

JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

4.4 提交流程

下面的两张图比较笼统,而且yarn-cluster模式不准确,完整准确的提交流程请采用[4.1.2 中文图解:](# 4.1.2 中文图解:)结合下方执行流程说明作详细系统学习。

4.4.1 高级视角提交流程(通用提交流程)

我们来看看当一个应用提交执行时,Flink的各个组件是如何交互协作的:

4.4.2 yarn-cluster提交流程per-job

执行流程说明:

  1. Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
  2. 向Yarn ResourceManager提交任务,ResourceManager分配Container资源
  3. 通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager
  4. ApplicationMaster向ResourceManager申请资源启动TaskManager
  5. ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
  6. NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
  7. TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。

五、Flink流处理核心编程

和其他所有的计算框架一样,Flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分

5.1 Environment

Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单

以下的两种API都可以自动区分部署环境

// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

5.2 Source

Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源**(Source)**。

5.2.1 准备工作

  1. 导入注解工具依赖, 方便生产POJO类
   <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <version>1.18.16</version>
       <scope>provided</scope>
   </dependency>
  1. 准备一个WaterSensor类方便演示
   /**
    * 水位传感器:用于接收水位数据
    *
    * id:传感器编号
    * ts:时间戳
    * vc:水位
    */
   @Data
   @NoArgsConstructor
   @AllArgsConstructor
   public class WaterSensor {
       private String id;
       private Long ts;
       private Integer vc;
   }

5.2.2 从Java的集合中读取数据

一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。

public class Flink05_Source_Collection {
    public static void main(String[] args) throws Exception {
        List<WaterSensor> waterSensors = Arrays.asList(
          new WaterSensor("ws_001", 1577844001L, 45),
          new WaterSensor("ws_002", 1577844015L, 43),
          new WaterSensor("ws_003", 1577844020L, 42));

        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .fromCollection(waterSensors)
         .print();
        env.execute();
    }
}

5.2.3 从文件读取数据

public class Flink05_Source_File {
    public static void main(String[] args) throws Exception {

        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .readTextFile("input")
          .print();

        env.execute();
    }
}

说明:

  1. 参数可以是目录也可以是文件

  2. 路径可以是相对路径也可以是绝对路径

  3. 相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录

  4. 也可以从hdfs目录下读取, 使用路径:hdfs://…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
        <scope>provided</scope>
    </dependency>
    

5.2.4 从Socket读取数据

[参考第二章无界流读取](# 2.3.2 无界流)

5.2.5 从Kafka读取数据

添加相应的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

参考代码

public class Flink05_Source_Kafka {
    public static void main(String[] args) throws Exception {

        // 0.Kafka相关配置
        Properties conf = new Properties(); 	
        // conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        conf.setProperty("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
        conf.setProperty("group.id", "Flink01_Source_Kafka");
        conf.setProperty("auto.offset.reset", "latest");

        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 添加Kafka的数据源Source
       		 /*
                参数1:kafka中的主题
                参数2:序列化规则
                参数3:kafka的相关配置信息
        	*/
          DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer<>											("sensor", new SimpleStringSchema(), conf));
        
         // 打印     
         kafkaSource.print("kafka source");

         env.execute();
    }
}

5.2.6 自定义Source

大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式。

示例1:

public class Flink05_Source_Custom {
    public static void main(String[] args) throws Exception {

        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .addSource(new MySource("hadoop102", 9999))
          .print();

        env.execute();
    }

    // 自定义数据源,实现SourceFunction接口,泛型即是数据的类型
    /*
    	重写接口中的两个方法 :
		①:run()方法就是程序启动的方法
		②:cancel()方法就是程序结束的方法
	*/
    public static class MySource implements SourceFunction<WaterSensor> {
        private String host;
        private int port;
        private volatile boolean isRunning = true;
        private Socket socket;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }

        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            // 实现一个从socket读取数据的source
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = null;
            while (isRunning && (line = reader.readLine()) != null) {
                String[] split = line.split(",");
                ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
            }
        }

        /**
         * 大多数的source在run方法内部都会有一个while循环,
         * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
         */

        @Override
        public void cancel() {
            isRunning = false;
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
 */

示例2:

public class flink05_source_consum {

    public static void main(String[] args) throws Exception {

        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 获取数据源
        DataStreamSource<WaterSensor> ConsumSource = env.addSource(new MySource());

        // 3. 算子操作

        // 4. 输出、保存...
        ConsumSource.print();
        // 5. 启动执行
        env.execute();

    }

    private static class MySource implements SourceFunction<WaterSensor> {

        private boolean flag = true;
        Random random = new Random();
        // 启动数据源
        @Override
        public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
            while(flag){
                // 准备数据源
                WaterSensor waterSensor = new WaterSensor(
                        "sensor_" + random.nextInt(10),
                        System.currentTimeMillis()+ 492,
                        random.nextInt(100));
                Thread.sleep(1000);
                // 向下游发送数据
                sourceContext.collect(waterSensor);
            }
        }

        // 释放数据源
        @Override
        public void cancel() {
            this.flag = false;
        }
    }
}

5.3 Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream。程序可以把多个复杂的转换组合成复杂的数据流拓扑。类似于Spark中的转换算子。

5.3.0 算子的分类

如果算子前后是一对一关系,则算子中没有收集器,比如map算子

如果算子前后是一对多关系,则算子中有收集器,比如flatMap算子

1. 算子分类

**无状态算子 : ** 数据的输入和数据的输出是一致的,对数据没有聚合作用。

  1. map

  2. flatMap

  3. filter

数据传输算子:

  1. keyBy

**多流转换算子:**涉及到对多条流的合并、拆分等操作。

  1. split
  2. select
  3. connect —> coMap
  4. union

5.3.1 map

作用
将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

适用场景:把符合规范的数据转换为javaBean实体类

1

参数
lambda表达式或MapFunction实现类

返回
DataStream → DataStream(可以转换数据的类型)

示例
得到一个新的数据流: 新的流的元素是原来流的元素的平方

1. 匿名内部类对象

package com.heather.flink.java.chapter_5.transform;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink01_TransForm_Map_Anonymous {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
          .fromElements(1, 2, 3, 4, 5)
           // MapFunction()方法的参数:
           // 参数1:数据的输入的类型
           // 参数2:数据的输出的类型
          .map(new MapFunction<Integer, Integer>() {
              @Override
              public Integer map(Integer value) throws Exception {
                  return value * value;
              }
          })
          .print();

        env.execute();
    }
}

2. Lambda表达式表达式

env
  .fromElements(1, 2, 3, 4, 5)
  .map(ele -> ele * ele)
  .print();

3. 静态内部类

package com.heather.flink.java.chapter_5.transform;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink01_TransForm_Map_StaticClass {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
          .fromElements(1, 2, 3, 4, 5)
          .map(new MyMapFunction())
          .print();

        env.execute();
    }

    public static class MyMapFunction implements MapFunction<Integer, Integer> {

        @Override
        public Integer map(Integer value) throws Exception {
            return value * value;
        }
    }
    
}

Rich…Function类
所有Flink函数类都有其Rich版本。它与常规函数的不同在于提供了更多的,更丰富的功能:

  1. 可以获取运行环境的上下文(主要用于获取数据的状态)
  2. 提供了获取连接的生命周期方法(open()和close(),各执行一次)
    • open()方法:执行的次数和并行度成**“正相关”**。
    • close()方法:执行的次数和并行度成**“正相关”,有界流会是并行度 * 2**。

例如:

RichMapFunction

package com.heather.flink.java.chapter_5.transform;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink01_TransForm_Map_RichMapFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5);

        env
          .fromElements(1, 2, 3, 4, 5)
          .map(new MyRichMapFunction()).setParallelism(2)
          .print();

        env.execute();
    }

    public static class MyRichMapFunction extends RichMapFunction<Integer, Integer> {
        // 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open ... 执行一次");
        }

        // 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次
        @Override
        public void close() throws Exception {
            System.out.println("close ... 执行一次");
        }

        @Override
        public Integer map(Integer value) throws Exception {
            System.out.println("map ... 一个元素执行一次");
            return value * value;
        }
    }
}

说明:

  1. 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次, 而且先被调用。
  2. 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用。
  3. getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态. 开发人员在需要的时候自行调用获取运行时上下文对象。

5.3.2 flatMap

作用
消费一个元素并产生零个或多个元素

参数
参数1:Integer value : 输入的数据

参数2:Collector out : 输出的数据

返回
DataStream → DataStream(可以转换数据的类型)

示例 : 如下

1. 匿名内部类写法

// 新的流存储每个元素的平方和3次方
env
  .fromElements(1, 2, 3, 4, 5)
   // FlatMapFunction()方法的参数:
   // 参数1:数据的输入的类型
   // 参数2:数据的输出的类型
  .flatMap(new FlatMapFunction<Integer, Integer>() {
      @Override
      public void flatMap(Integer value, Collector<Integer> out) throws Exception {
          out.collect(value * value);
          out.collect(value * value * value);
      }
  })
  .print();

2. Lambda表达式写法

env
  .fromElements(1, 2, 3, 4, 5)
  .flatMap((Integer value, Collector<Integer> out) -> {
      out.collect(value * value);
      out.collect(value * value * value);
  }).returns(Types.INT)
  .print();

说明: 在使用Lambda表达式表达式的时候, 由于泛型擦除的存在, 在运行的时候无法获取泛型的具体类型, 全部当做Object来处理, 及其低效, 所以Flink要求当参数中有泛型的时候, 必须明确指定泛型的类型.

5.3.3 filter

作用
根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

参数
FlatMapFunction实现类

返回
DataStream → DataStream(可以转换数据的类型,仅仅是过滤的作用)

示例 : 如下

1. 匿名内部类写法

// 保留偶数, 舍弃奇数
env
  .fromElements(10, 3, 5, 9, 20, 8)
  .filter(new FilterFunction<Integer>() {
      @Override
      public boolean filter(Integer value) throws Exception {
          return value % 2 == 0;
      }
  })
  .print();

2. Lambda表达式写法

env
  .fromElements(10, 3, 5, 9, 20, 8)
  .filter(value -> value % 2 == 0)
  .print();

5.3.4 keyBy

作用

  1. (键控流)把流中的数据分到不同的分区中,具有相同key的元素会分到同一个分区中【任务槽】(数据倾斜的根源),不同的key也有可能在同一个分区【任务槽】中,但是逻辑上是隔离开的,即 给每个数据打上标签(属于哪个分组),并不是对并行度进行改变。

  2. 通过keyBy之后得到keyedStream流,keyedStream流就可以进行各种各样的聚合算子进行操作。

    [5.3.10 简单滚动聚合算子](# 5.3.10 简单滚动聚合算子)

分区规则:在内部是使用的hash分区来实现的,根据key的hash值对key进行求模操作。

难点:

【重点】 【重点】 【重点】

KeyedStream的参数说明:

Type parameters:
– The type of the elements in the Keyed Stream. (返回值得类型)
– The type of the key in the Keyed Stream.(分组的key的类型)

KeyedStream<Tuple2<String, Integer>, Tuple> wordToGroupDS = wordAndOneDS.keyBy(0);
KeyedStream<WaterSensor, Tuple> sensorKSByFieldName = sensorDS.keyBy("id");

通过上述的两个代码可知,无论是通过 位置索引 还是 字段名称 进行分组,返回的key的类型都是Tuple(因为无法确定),这就导致后续使用key的时候还需要进行类型的转换等。

**所以:**通过 明确的指定 key 的方式, 获取到的 key就是具体的类型 => 实现 KeySelector 或 lambda

参数
Key选择器函数: interface KeySelector<IN, KEY>

? 注意: 什么值不可以作为KeySelector的Key:

  • 没有覆写hashCode方法的POJO, 而是依赖Object的hashCode. 因为这样分组没有任何的意义: 每个元素都会得到一个独立无二的组. 实际情况是:可以运行, 但是分的组没有意义.
  • 任何类型的数组

返回
DataStream → KeyedStream

示例:明确的指定 key 的方式【对象.get属性()】 : 如下

1. 匿名内部类写法

public class Flink02_Transform_KeyBy {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 获取数据源
        DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
        // 2. 操作算子
        // 2.1 map()算子把数据转换为对象
        SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] datas = value.split(",");
                return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
            }
        });
        // 2.2 分组(下面的三种方式请观察返回值类型中的泛型)

        // 2.2.1 通过指定下标索引的方式【不推荐使用】
        // KeyedStream<WaterSensor, Tuple> keyByDS = mapDS.keyBy(0);
        // 2.2.2 通过指定 属性 的方式【不推荐使用】
        // KeyedStream<WaterSensor, Tuple> keyByDS = mapDS.keyBy("id");
        // 2.2.3 明确的指定 key 的 方式 【推荐使用】
        KeyedStream<WaterSensor, String> keybyDS = mapDS.keyBy(new MyKeyBySeletor());

        // 3. 输出、保存 ...
        keybyDS.print();
        env.execute();
    }

    private static class MyKeyBySeletor implements KeySelector<WaterSensor,String> {
        @Override
        public String getKey(WaterSensor value) throws Exception {
            return value.getId();
        }
    }
}

2. Lambda表达式写法

public class Flink02_Transform_KeyBy {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 获取数据源
        DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
        // 2. 操作算子
        // 2.1 map()算子把数据转换为对象
        SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] datas = value.split(",");
                return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
            }
        });
        // 2.2 分组【lambda表达式方法】
        KeyedStream<WaterSensor, String> keybyDS = mapDS.keyBy( r -> r.getId() );

        // 3. 输出、保存 ...
        keybyDS.print();
        env.execute();
    }
}

5.3.6 shuffle

作用
把流中的元素随机打乱,然后随机(底层是分区器)发送到下游,对同一个组数据, 每次只需得到的结果都不同。

参数

返回
DataStream → DataStream

示例 : 如下

env
  .fromElements(10, 3, 5, 9, 20, 8)
  .shuffle()
  .print();
env.execute();

5.3.7 split和select(过时)

已经过时, 在1.12中已经被移除

作用
某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出。
split用于给流中的每个元素添加标记. select用于根据标记取出对应的元素, 组成新的流。在新的版本中已经被新的API替代:Please use side output instead

~

参数
split参数: interface OutputSelector
select参数: 字符串

返回
split: SingleOutputStreamOperator -> SplitStream~

slect: SplitStream -> DataStream

示例 : 如下

1. 匿名内部类写法

public class Flink03_Transform_select {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");

        // 把数据转换为javaBean类
        SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] datas = value.split(",");
                return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
            }
        });

        // 把一个流中的元素按照指定规则进行切分(其实就是给符合规则的元素贴上标签)
        SplitStream<WaterSensor> splitDS = mapDS.split(new MyOutputSeletor());

//        splitDS.select("normal").print("正常");
//        splitDS.select("warning").print("报警");
//        splitDS.select("alarm").print("警告");

        splitDS.select("balance").print("正常");
        splitDS.select("balance").print("报警");

        env.execute();

    }

    private static class MyOutputSeletor implements OutputSelector<WaterSensor> {
        /*
            定义规则:
                正常: 水位小于95
                报警: 水位大于95,小于98
                警告: 水位大于98
         */
        @Override
        public Iterable<String> select(WaterSensor value) {
            // 返回值类型是String类型的迭代器,说明,每一个元素都可以贴上多个标签进行说明
            if(value.getVc() <= 95){
                return Arrays.asList("normal","balance");
            }else if(value.getVc() < 98){
                return Arrays.asList("warning","balance");
            }else if(value.getVc() >= 98){
                return Arrays.asList("alarm");
            }else {
                return Arrays.asList("。。。。");
            }
        }
    }
}

2. Lambda表达式写法

// 奇数一个流, 偶数一个流
SplitStream<Integer> splitStream = env
  .fromElements(10, 3, 5, 9, 20, 8)
  .split(value -> value % 2 == 0
    ? Collections.singletonList("偶数")
    : Collections.singletonList("奇数"));
splitStream
  .select("偶数")
  .print("偶数");

splitStream
  .select("奇数")
  .print("奇数");
env.execute();

5.3.8 connect

作用
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

扩展:

  1. 以FIFO队列的方式进入到合流中。
  2. connect算子只能合并两条数据流,但是这两条数据流的数据类型可以不一致

参数
另外一个流

返回
DataStream[A], DataStream[B] -> ConnectedStreams[A,B]

示例 : 如下

DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");
// 把两个流连接在一起: 貌合神离
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs.getFirstInput().print("first");
cs.getSecondInput().print("second");
env.execute();

注意:

  1. 两个流中存储的数据类型可以不同
  2. 只是机械的合并在一起, 内部仍然是分离的2个流
  3. 只能2个流进行connect, 不能有第3个参与

5.3.9 union

作用
两个或者两个以上DataStream流进行**union(合并)**操作,产生的结果是一个包含所有DataStream流的元素的新DataStream流。

扩展:

  1. 以FIFO队列的方式进入到合流中
  2. 不去重
  3. union算子可以合并多条流 ,但是保证每一条流的元素的类型必须一致。

示例 : 如下

DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<Integer> stream2 = env.fromElements(10, 20, 30, 40, 50);
DataStreamSource<Integer> stream3 = env.fromElements(100, 200, 300, 400, 500);

// 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融
stream1
  .union(stream2)
  .union(stream3)
  .print();

connect与 union 区别:

  1. union之前两个流的类型必须是一样,connect可以不一样。
  2. connect只能操作两个流,union可以操作多个。
  3. connect连接后的两个流其实也是两条流,并没有真正的合并为一条流(因为流的类型不一致)具体查看源码,而union是真的把多条流合并在了一起,组成了一条流。

5.4 Operator

5.4.1 滚动聚合算子RollingAgg

1、简言之:数据来一条,聚合一条,输出一次!!!

2、在适用Operarot算子之前,必须先适用keyBy算子对数据进行分组

常见的滚动聚合算子

  • sum
  • min
  • ma
  • minBy
  • maxBy

作用
KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

参数
如果流中存储的是POJO或者scala的样例类, 参数使用字段名
如果流中存储的是元组, 参数就是位置(基于0…)

返回
KeyedStream -> SingleOutputStreamOperator

示例 : 如下

  • 示例1
DataStreamSource<Integer> stream = env.fromElements(1, 2, 3, 4, 5);
KeyedStream<Integer, String> kbStream = stream.keyBy(ele -> ele % 2 == 0 ? "奇数" : "偶数");
kbStream.sum(0).print("sum");
kbStream.max(0).print("max");
kbStream.min(0).print("min");
  • 示例2
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 30));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

KeyedStream<WaterSensor, String> kbStream = env
  .fromCollection(waterSensors)
  .keyBy(WaterSensor::getId);

kbStream
  .sum("vc")
  .print("maxBy...");

image-20210610184723784

注意: **
分组聚合后, 理论上只
能取分组字段和聚合结果**, 但是Flink允许其他的字段也可以取出来, 其他字段默认情况是取的是这个组内第一个元素的字段值

  • 示例3:
ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

KeyedStream<WaterSensor, String> kbStream = env
  .fromCollection(waterSensors)
  .keyBy(WaterSensor::getId);

kbStream
  .maxBy("vc", false)
  .print("max...");

env.execute();

注意: **
maxBy和minBy可以指定当出现相同值的时候,其他字段
是否取第一个**. true表示取第一个, false表示取最后一个.

5.4.2 process

作用
process算子在Flink算是一个比较底层的算子,很多类型的流上都可以调用,可以从流中获取更多的信息(不仅仅数据本身)

1. 示例1: 在keyBy之前的流上使用(ProcessFunction)

env
  .fromCollection(waterSensors)
  .process(new ProcessFunction<WaterSensor, Tuple2<String, Integer>>() {
      @Override
      public void processElement(WaterSensor value,
                                 Context ctx,
                                 Collector<Tuple2<String, Integer>> out) throws Exception {
          out.collect(new Tuple2<>(value.getId(), value.getVc()));
      }
  })
  .print();

image-20210607130701149

2. 示例2: 在keyBy之后的流上使用 (KeyedProcessFunction)

env
  .fromCollection(waterSensors)
  .keyBy(WaterSensor::getId)
  .process(new KeyedProcessFunction<String, WaterSensor, Tuple2<String, Integer>>() {
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
          out.collect(new Tuple2<>("key是:" + ctx.getCurrentKey(), value.getVc()));
      }
  })
  .print();

3. 综合案例:

public class Flink06_Transform_process {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //DataStreamSource<String> fileDS = env.socketTextStream("localhost", 9999);

        DataStreamSource<String> fileDS = env.readTextFile("input/sensor_data.log");
        // 转换为对象
        SingleOutputStreamOperator<WaterSensor> beanDS = fileDS.map((MapFunction<String, WaterSensor>) r -> {
            String[] datas = r.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

        // 获取对象中的各个属性,针对于属性进行分组
        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> mapDS = beanDS.map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
            @Override
            public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
                return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
            }
        });

        // 分组
        // 适用lambda方式
        KeyedStream<Tuple3<String, Long, Integer>, String> keyByDS = mapDS.keyBy(r -> r.f0);

        keyByDS.process(new KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>() {
            /**
             * processElement 处理数据的方法:来一条,处理一条
             * @param value 一条数据
             * @param ctx 上下文对象
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void processElement(Tuple3<String, Long, Integer> value, Context ctx, Collector<String> out) throws Exception {
                out.collect("当前key : " + ctx.getCurrentKey() +  ",当前时间 :" + ctx.timestamp() + ",数据 :" + value);
            }
        }).print("process方法——>");

        env.execute();
    }
}

KeyedProcessFunction类中的Context上下文对象的来源(源码追踪):

  1. KeyedProcessFunction继承自AbstractRichFunction类
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
  1. AbstractRichFunction类中定义了上下文对象
public abstract class AbstractRichFunction implements RichFunction, Serializable {

   private transient RuntimeContext runtimeContext;

所以KeyedProcessFunction类中的Context上下文对象来源于AbstractRichFunction类

5.4.3 reduce

作用
(简化规约 — 聚合)一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值!(也就是flink中的状态)

参数
interface ReduceFunction

返回
KeyedStream -> SingleOutputStreamOperator

示例 : 如下

1. 匿名内部类写法

ArrayList<WaterSensor> waterSensors = new ArrayList<>();
waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

KeyedStream<WaterSensor, String> kbStream = env
  .fromCollection(waterSensors)
  .keyBy(WaterSensor::getId);

kbStream
  .reduce(new ReduceFunction<WaterSensor>() {
      /*
          参数说明:
             - 参数1 - value1:累加器,原始数据
             - 参数2 - value2:新添加的数据
       */
      @Override
      public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
          System.out.println("reducer function ...");
          return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
      }
  })
  .print("reduce...");

env.execute();

2. Lambda表达式写法

kbStream
  .reduce((value1, value2) -> {
      System.out.println("reducer function ...");
      return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
  })
  .print("reduce...");

示例3:

public class Flink05_Transform_reduce {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> fileDS = env.socketTextStream("localhost", 9999);

        // 转换为对象
        SingleOutputStreamOperator<WaterSensor> beanDS = fileDS.map((MapFunction<String, WaterSensor>) r -> {
            String[] datas = r.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

        // 获取对象中的各个属性,针对于属性进行分组
        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> mapDS = beanDS.map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
            @Override
            public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
                return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
            }
        });

        // 分组
        // 适用lambda方式
        KeyedStream<Tuple3<String, Long, Integer>, String> keyByDS = mapDS.keyBy(r -> r.f0);

        // reduce聚合方法
        /*
            1. 输入的类型要一致,输出的类型也一致(构建的新值)。
            2. 首次进入的数据,不会进入reduce(因为reduce必须满足两条数据)
            3. 保存了中间的状态(值)
         * */
        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> result = keyByDS.reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
            @Override
            public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> value1, Tuple3<String, Long, Integer> value2) throws Exception {
                // 求水位的聚合值
                //return value1.f2 + value2.f2;
                return Tuple3.of("result", 123456789L, value1.f2 + value2.f2);
            }
        });
        result.print("result");

        env.execute();
    }
}

注意:

  1. 聚合后结果的类型, 必须和原来流中元素的类型保持一致!

image-20210610191241823

5.4.4 对流重新分区的几个算子

1. KeyBy

先按照key分组, 按照key的双重hash来选择后面的分区

2. shuffle

对流中的元素随机分区

3. reblance

对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化

4. rescale

同 rebalance一样, 也是平均循环的分布数据。但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"。

5.5 Sink

Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作。类似于Spark中的行动算子。

  1. 之前我们一直在使用的print方法其实就是一种Sink
public DataStreamSink<T> print(String sinkIdentifier) {
   PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
   return addSink(printFunction).name("Print to Std. Out");
}

  1. Flink内置了一些Sink, 除此之外的Sink需要用户自定义!

    image-20210607131158454

    image-20210607131234832

5.5.1 KafkaSink

kafka常用目录技巧:

1. topic --zookeeper
2. producer --broker-list
3. consumer --bootstrap-server
  • 添加Kafka Connector依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
  • 启动Kafka集群

    [heather@hadoop102 bin]$ kafka.sh start

  • Sink到Kafka的示例代码

public class Flink07_Sink_kafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.readTextFile("input/sensor_data.log");

        inputDS.addSink(new FlinkKafkaProducer011<String>(
                "hadoop102:9092",
                "sensor110", new SimpleStringSchema())
        );

        // 启动执行程序
        env.execute();
    }
}
  • 在linux启动一个消费者, 查看是否收到数据
    kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic sensor110
    image-20210610211715402

5.5.2 RedisSink

  • 添加Redis Connector依赖
  <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.1.5</version>
  </dependency>
  • 启动Redis服务器

  • Sink到Redis的示例代码

  package com.heather.flink.java.chapter_5.sink;
  
  import com.alibaba.fastjson.JSON;
  import com.atguigu.flink.java.chapter_5.WaterSensor;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.connectors.redis.RedisSink;
  import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  
  import java.util.ArrayList;
  
  public class Flink02_Sink_Redis {
      public static void main(String[] args) throws Exception {
          ArrayList<WaterSensor> waterSensors = new ArrayList<>();
          waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
          waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
          waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
          waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
          waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));
  
          // 连接到Redis的配置
          FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
            .setHost("hadoop102")
            .setPort(6379)
            .setMaxTotal(100)
            .setTimeout(1000 * 10)
            .build();
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
          env
            .fromCollection(waterSensors)
            .addSink(new RedisSink<>(redisConfig, new RedisMapper<WaterSensor>() {
                /*
                  key                 value(hash)
                  "sensor"            field           value
                                      sensor_1        {"id":"sensor_1","ts":1607527992000,"vc":20}
                                      ...             ...
                 */
  
                @Override
                public RedisCommandDescription getCommandDescription() {
                    // 返回存在Redis中的数据类型  存储的是Hash, 第二个参数是外面的key
                    return new RedisCommandDescription(RedisCommand.HSET, "sensor");
                }
  
                @Override
                public String getKeyFromData(WaterSensor data) {
                    // 从数据中获取Key: Hash的Key
                    return data.getId();
                }
  
                @Override
                public String getValueFromData(WaterSensor data) {
                    // 从数据中获取Value: Hash的value
                    return JSON.toJSONString(data);
                }
            }));
  
          env.execute();
      }
  }
  • Redis查看是否收到数据
    redis-cli --raw
    ![](https://img-blog.csdnimg.cn/img_convert/b9db9d87df1dd9d8136b02e5b76b2b1e.png#align=left&display=inline&height=352&margin=[object Object]&originHeight=352&originWidth=1184&status=done&style=none&width=1184)

    **注意: **
    发送了5条数据, redis中只有2条数据. 原因是hash的field的重复了, 后面的会把前面的覆盖掉·

5.5.3 ElasticsearchSink

  • 添加Elasticsearch Connector依赖
<!-- [https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6) -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.12.0</version>
</dependency>
  • 启动Elasticsearch集群
  • Sink到Elasticsearch的示例代码
package com.heather.flink.java.chapter_5.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class Flink03_Sink_ES {
    public static void main(String[] args) throws Exception {
        ArrayList<WaterSensor> waterSensors = new ArrayList<>();
        waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
        waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
        waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
        waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
        waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

        List<HttpHost> esHosts = Arrays.asList(
          new HttpHost("hadoop102", 9200),
          new HttpHost("hadoop103", 9200),
          new HttpHost("hadoop104", 9200));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env
          .fromCollection(waterSensors)
          .addSink(new ElasticsearchSink.Builder<WaterSensor>(esHosts, new ElasticsearchSinkFunction<WaterSensor>() {

              @Override
              public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
                  // 1. 创建es写入请求
                  IndexRequest request = Requests
                    .indexRequest("sensor")
                    .type("_doc")
                    .id(element.getId())
                    .source(JSON.toJSONString(element), XContentType.JSON);
                  // 2. 写入到es
                  indexer.add(request);
              }
          }).build());

        env.execute();
    }
}
  • Elasticsearch查看是否收到数据

    注意:

    如果出现如下错误:

  • 添加log4j2的依赖:

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
    
  • 如果是无界流, 需要配置bulk的缓存

esSinkBuilder.setBulkFlushMaxActions(1);

5.5.4 自定义Sink

如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,怎么办?
我们自定义一个到Mysql的Sink

  • 在mysql中创建数据库和表
create database test;
use test;
CREATE TABLE `sensor` (
  `id` varchar(20) NOT NULL,
  `ts` bigint(20) NOT NULL,
  `vc` int(11) NOT NULL,
  PRIMARY KEY (`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 导入Mysql驱动
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>
  • 写到Mysql的自定义Sink示例代码
package com.heather.flink.java.chapter_5.sink;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;

public class Flink04_Sink_Custom {
    public static void main(String[] args) throws Exception {
        ArrayList<WaterSensor> waterSensors = new ArrayList<>();
        waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 20));
        waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 50));
        waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 50));
        waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
        waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.fromCollection(waterSensors)
          .addSink(new RichSinkFunction<WaterSensor>() {

             private PreparedStatement ps;
              private Connection conn;

              @Override
              public void open(Configuration parameters) throws Exception {
                  conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false", "root", "000000");
                  ps = conn.prepareStatement("insert into sensor values(?, ?, ?)");
              }

              @Override
              public void close() throws Exception {
                ps.close();
                  conn.close();
              }

              @Override
              public void invoke(WaterSensor value, Context context) throws Exception {
                ps.setString(1, value.getId());
                  ps.setLong(2, value.getTs());
                  ps.setInt(3, value.getVc());
                  ps.execute();
              }
          });


        env.execute();
    }
}

5.6 执行模式(Execution Mode)

? Flink在1.12.0上对流式API新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes)。
? 流式API的传统执行模式我们称之为STREAMING执行模式, 这种模式一般用于无界流, 需要持续的在线处理。
? 1.12.0新增了一个BATCH执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据。
? 默认是使用的STREAMING执行模式。

5.6.1 选择执行模式

BATCH执行模式仅仅用于有界数据, 而STREAMING 执行模式可以用在有界数据和无界数据.

一个公用的规则就是: 当你处理的数据是有界的就应该使用BATCH执行模式, 因为它更加高效. 当你的数据是无界的, 则必须使用STREAMING 执行模式, 因为只有这种模式才能处理持续的数据流.

5.6.2 配置BATH执行模式

执行模式有3个选择可配:

  1. STREAMING(默认)
  2. BATCH
  3. AUTOMATIC
  • 通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

  • 通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

建议:

不要在运行时配置(代码中配置), 而是使用命令行配置, 引用这样会灵活: 同一个应用即可以用于无界数据也可以用于有界数据

5.6.3 有界数据用STREAMING和BATCH的区别

STREAMING模式下, 数据是来一条输出一次结果

BATCH模式下, 数据处理完之后, 一次性输出结果

下面展示WordCount的程序读取文件内容在不同执行模式下的执行结果对比 :

  • 流式模式

// 默认流式模式, 可以不用配置
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

  • 批处理模式

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

  • 自动模式

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

5. 7 综合案例练习

5.7.1 基埋点日志数据的网络流量统计

5.7.1.1 网站总浏览量(PV)的统计

衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。接下来我们就用咱们之前学习的Flink算子来实现在PV的统计

  1. 数据准备
在咱们当前的案例中,给大家准备了某电商网站的用户行为日志数据UserBehavior.csv,本日志数据文件中包含了某电商网站一天近五十万随机用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。将数据文件放置在指定目录中,便于读取到Flink中使用
  1. 读取日志数据转换为JavaBean对象方便操作
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {

    // 用户ID
    private Long userId;

    // 商品ID
    private Long itemId;

    //品类ID
    private Integer categoryId;

    //用户的行为类型:pv(浏览) 、 buy(购买) 、 fav(收藏) 、cart(喜欢)
    private String behavior;

    //时间戳
    private Long timestamp;
}
  1. 核心代码 - WordCount思想
public class Flink10_Case_PV {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 1. 获取数据源
        DataStreamSource<String> inputDS = env.readTextFile("input/UserBehavior.csv");
        
        // 2. 数据处理
        // 2.1 把数据转换为javaBean对象
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = inputDS.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String value) throws Exception {
                String[] datas = value.split(",");
                // 通过数组中的各个元素构建对象
                return new UserBehavior(
                        Long.valueOf(datas[0]),
                        Long.valueOf(datas[1]),
                        Integer.valueOf(datas[2]),
                        datas[3],
                        Long.valueOf(datas[4])
                );
            }
        });

        // 2.2 数据清洗阶段(过滤出所有数据中用户行为是浏览的【pv】)
        SingleOutputStreamOperator<UserBehavior> pvDS = userBehaviorDS.filter(new FilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior value) throws Exception {
                return "pv".equals(value.getBehavior());
            }
        });

        // 2.3 TODO WordCount求和操作即可
        // 把过滤后的数据转换为二元组(pv,1)
        SingleOutputStreamOperator<Tuple2<String, Integer>> pvTuple2DS = pvDS.map(new MapFunction<UserBehavior, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
                return new Tuple2<>("pv", 1);
            }
        });

        // 分组
        KeyedStream<Tuple2<String, Integer>, Tuple> kedByDS = pvTuple2DS.keyBy(0);
        // 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> pvSum = kedByDS.sum(1);

        // 3. 输出、保存...
        pvSum.print("PV总量");

        // 启动执行
        env.execute();
    }
}
  1. 核心代码 - 统计的维度 思想 进行分组【重点 ! ! !】
public class Flink10_Case_PV_Process_DIM {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 1. 获取数据源
        DataStreamSource<String> inputDS = env.readTextFile("input/UserBehavior.csv");
        
        // 2. 数据处理
        // 2.1 把数据转换为javaBean对象
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = inputDS.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String value) throws Exception {
                String[] datas = value.split(",");
                // 通过数组中的各个元素构建对象
                return new UserBehavior(
                        Long.valueOf(datas[0]),
                        Long.valueOf(datas[1]),
                        Integer.valueOf(datas[2]),
                        datas[3],
                        Long.valueOf(datas[4])
                );
            }
        });

        // 2.2 数据清洗阶段(过滤出所有数据中用户行为是浏览的【pv】)
        SingleOutputStreamOperator<UserBehavior> pvDS = userBehaviorDS.filter(new FilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior value) throws Exception {
                return "pv".equals(value.getBehavior());
            }
        });

        // 2.3 TODO 按照 统计的维度 分组 :pv行为
        // Lambda表达式
        //KeyedStream<UserBehavior, String> userBehaviorPvDS = pvDS.keyBy(data -> data.getBehavior());
        // 匿名内部对象方式
        KeyedStream<UserBehavior, String> userBehaviorPvDS = pvDS.keyBy(new KeySelector<UserBehavior, String>() {
            @Override
            public String getKey(UserBehavior value) throws Exception {
                return value.getBehavior();
            }
        });
        // 求和 => 实现 计数 的功能,没有count这种聚合算子
        // 般找不到现成的算子,那就调用底层的 process
        SingleOutputStreamOperator<Long> pvSum = userBehaviorPvDS.process(new KeyedProcessFunction<String, UserBehavior, Long>() {
            // 定义变量,用来统计数据
            private Long pvSum = 0L;

            /**
             * process 算子的特点:来一条数据处理一条数据,就执行一次算子。【也就是循环】
             * @param value  数据
             * @param ctx    上下文对象
             * @param out     采集器
             * @throws Exception
             */
            @Override
            public void processElement(UserBehavior value, Context ctx, Collector<Long> out) throws Exception {
                pvSum++;
                out.collect(pvSum);
            }
        });
        // 3. 输出、保存...
        pvSum.print("PV总量");
        // 启动执行
        env.execute();
    }
}
  1. 核心代码 - flatMap算子(一进多出或0出)【重点 ! ! !】
public class Flink10_Case_PV_FlatMap {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 1. 获取数据源
        env.readTextFile("input/UserBehavior.csv")
                .flatMap(new FlatMapFunction<String, Tuple2<String,Long>>() {
                        @Override
                        public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                            String[] datas = value.split(",");
                            if("pv".equals(datas[3])){
                                out.collect(Tuple2.of("pv",1L));
                            }
                        }
                    })
                .keyBy(0)
                .sum(1)
                .print("PV总量");
        
        // 启动执行
        env.execute();
    }
}

5.7.1.2 网站独立访客数(UV)的统计

以上案例中,我们统计的是所有用户对页面的所有浏览行为,也就是同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会关注,到底有多少不同的用户访问了网站,所以另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)

  1. 数据准备

    对于UserBehavior数据源来说,我们直接可以根据userId来区分不同的用户
    
  2. 核心代码

public class Flink10_Case_UV {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 1. 获取数据源
        DataStreamSource<String> inputDS = env.readTextFile("input/UserBehavior.csv");
        
        // 2. 数据处理
        // 2.1 把数据转换为javaBean对象
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = inputDS.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String value) throws Exception {
                String[] datas = value.split(",");
                // 通过数组中的各个元素构建对象
                return new UserBehavior(
                        Long.valueOf(datas[0]),
                        Long.valueOf(datas[1]),
                        Integer.valueOf(datas[2]),
                        datas[3],
                        Long.valueOf(datas[4])
                );
            }
        });

        // 2.2 TODO UV就是PV去重后的结果
        // 实现:对userId进行去重操作。(把userId存储到set集合中,最后获取set集合的长度即可)
        //  1. 数据清洗阶段(过滤出所有数据中用户行为是浏览的【pv】)
        SingleOutputStreamOperator<UserBehavior> pvDS = userBehaviorDS.filter((FilterFunction<UserBehavior>) value -> "pv".equals(value.getBehavior()));

        // 2. 把数据转换为二元组(uv,userId)
        //  二元组的组成元素:
        // 第一个位置:uv:是为了分组,方便后续的sum求和和process操作
        // 第二个位置:userId,是为了把获取到的数据存储到set集合中进行去重
        SingleOutputStreamOperator<Tuple2<String, Long>> uvTuple2 = pvDS.map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                return Tuple2.of("uv", value.getUserId());
            }
        });

        // 3. 分组
        KeyedStream<Tuple2<String, Long>, String> keyedUvDS = uvTuple2.keyBy(datas -> datas.f0);
        // 4. 此时到了需要求和的步骤了,但是flink明显没有提供求和操作,所以可以使用process操作
        SingleOutputStreamOperator<Integer> uvSum = keyedUvDS.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Integer>() {
            Set<Long> userIdSet = new HashSet<>();

            @Override
            public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Integer> out) throws Exception {
                userIdSet.add(value.f1);
                out.collect(userIdSet.size());
            }
        });

        // 3. 输出、保存...
        uvSum.print("UV总量");

        // 启动执行
        env.execute();
    }
}

六、Flink中四大基石

Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

image-20210628183654483

  1. 窗口 : window

  2. 时间语义 & 水位线 : time & watermark

  3. 状态管理 : state

  4. 容错机制 : checkPoint & savePoint

6.1 Window

6.1 为什么使用window

在Flink的流处理应用中,数据是源源不断的,但是有时候我们需要对数据最一些聚合类的操作,例如:①统计在过去的1分钟内有多少用户点击了网页?②在过去的5分钟内有多少用户进行了下单等等。。

此时,我们就需要定义一个窗口(window)用来收集最近1分钟5分钟的数据,并且对此窗口内的所有数据进行计算。

6.2 窗口划分标准

6.2.1 按照time和count分类

  1. time-window : 根据时间划分窗口,如:每隔 1分钟统计最近1分钟的数据。

  2. count-window : 根据数量划分窗口,如:每隔100个数据统计最近100个数据

image-20210628190349619

6.2.2 按照slide和size分类

  1. tumbling-window : 滚动窗口,size=slide,如:每隔5s统计最近5s的数据

    image-20210628191625022

  2. sliding-window : 滑动窗口 : size>slide,如:每隔5s统计最近10s的数据

    image-20210628191726785

注意:

  • 当size<slide时,如:每隔15s统计最近10s的数据,这种需求会丢失5s的数据,所以,开发中不用。
  • 滚动窗口其实就是一种特殊的滑动窗口。

滑动窗口细节:

  • 如果窗口长度为10s,而滑动步长为2s,那么一个数据会计算(10 / 2)5次

    如果 窗口长度为10s,而滑动步长为3s,那么一个数据会计算(10 / 3) 3次或者4次

  • 滑动窗口的计算是由后向前进行计算,具体详情请查看图。

    滑动窗口在计算时,先计算时间戳较大的,也就是window4 -> window3 -> window2 -> window1的顺序

    image-20210628232031298

6.2.3 会话

Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算

窗口分类的总结:

  1. 基于时间的滚动窗口tumbling-time-window–用的较多

  2. 基于时间的滑动窗口sliding-time-window–用的较多

  3. 基于数量的滚动窗口tumbling-count-window–用的较少

  4. 基于数量的滑动窗口sliding-count-window–用的较少

6.3 window API

window 和 windowAll

  • 使用keyby的流,应该使用window方法

  • 未使用keyby的流,应该调用windowAll方法

6.3.1 window

keyed windows(按照key分组后对窗口内的数据进行聚合)

stream
    .keyBy(...)
    .window(...)
    .reduce/aggregate/fold/apply()

6.3.2 windowAll

no keyed window(未分组)

stream 
    .windowAll(...)
    .reduce/aggregate/fold/apply()

6.3.3 windowAssigner

window 和 windowAll 这两个方法都需要一个windowAssigner作为参数进行使用。

windowAssigner(窗口分配器) : 负责将每条输入的数据分发到正确的window窗口中进行计算。

**哈哈 :**Flink为我们提供了很多的现成的windowAssigner 。

image-20210628195520416

如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

6.4 API 实战:

1. time 滚动/滑动

需求:
nc -lk 9999
有如下数据表示: 信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口
各个路口:分组

public class WindowDemo01_TimeWindow {

    public static void main(String[] args) throws Exception {

        // 基本环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 获取socket数据源
        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
        // 数据的格式转换
        SingleOutputStreamOperator<CartInfo> mapDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] datas = value.split(",");
                return new CartInfo(datas[0], Integer.parseInt(datas[1]));
            }
        });

        // 把数据进行分组
        KeyedStream<CartInfo, String> keyByDS = mapDS.keyBy(CartInfo::getSensorId);

        // 把数据分发到不同的window窗口中,使用windwoAssigner,此处因为已经分组,所以使用window()方法
        // WindowedStream<CartInfo, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        // 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
        WindowedStream<CartInfo, String, TimeWindow> tumbliWindowDS = keyByDS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        // 聚合结果
        SingleOutputStreamOperator<CartInfo> tumblCount = tumbliWindowDS.sum("count");
        // 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
        WindowedStream<CartInfo, String, TimeWindow> slideWindowDS = keyByDS
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10)));
        // 聚合结果
        SingleOutputStreamOperator<CartInfo> slidCount = slideWindowDS.sum("count");

        // 输出结果
        tumblCount.print("滚动");
        slidCount.print("滑动");

        env.execute();

    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }

}

2. count 滚动/滑动

public class WindowDemo02_CountWindow {

    public static void main(String[] args) throws Exception {

        // 基本环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 获取socket数据源
        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
        // 数据的格式转换
        SingleOutputStreamOperator<CartInfo> mapDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] datas = value.split(",");
                return new CartInfo(datas[0], Integer.parseInt(datas[1]));
            }
        });

        // 把数据进行分组
        KeyedStream<CartInfo, String> keyByDS = mapDS.keyBy(CartInfo::getSensorId);
        // 按照数据数量滚动窗口计算
        SingleOutputStreamOperator<CartInfo> tumdDS = keyByDS.countWindow(5L).sum("count");
        // 按照数据数量滑动窗口计算
        SingleOutputStreamOperator<CartInfo> slidDS = keyByDS.countWindow(6L, 2L).sum("count");


        // 输出结果
        // tumdDS.print("滚动");
        slidDS.print("滑动");

        env.execute();

    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }

}

6.5 会话窗口

和web领域的session会话一个意思,收集会话中的数据并进行分析。

API : 设置会话的时间间隔:ProcessingTimeSessionWindows.withGap

需求:

设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

public class WindowDemo03_SessionWindow {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<CartInfo> mapDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] datas = value.split(",");
                return new CartInfo(datas[0], Integer.valueOf(datas[1]));
            }
        });

        // 分组
        KeyedStream<CartInfo, String> keyedDS = mapDS.keyBy(CartInfo::getSensorId);

        // 会话窗口
        WindowedStream<CartInfo, String, TimeWindow> sessionDS = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));

        SingleOutputStreamOperator<CartInfo> result = sessionDS.sum("count");
        result.print("会话窗口");

        env.execute();

    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

6.2 Time & Watermark

6.2.1 Time

在Flink的流式处理中,会涉及到时间的不同概念,如下所示:

  1. EventTime : 事件时间
  2. IngestTime : 摄入时间(注入Flink时间)
  3. ProcessingTime :处理时间(机器时间)

6.2.1.1 三个时间的区别

  1. EventTime
    • 事件生成的时间,在进入Flink之前就已经存在,此时间可以从event的字段中进行抽取。
    • 使用时必须指定watermark的生成方式
    • 优势:确定性:乱序、延时、或者数据重复等情况,都能够进行正确的处理。
    • 弱点:处理无序事件时,性能和延迟会受到影响。
  2. IngestTime
    • 时间进入Flink的时间,即在Source阶段获取的当前系统的时间。
    • 使用时不需要指定watermark的生成方式(自动生成)
    • 弱点:不能处理无序事件和延时数据(不推荐使用)。
  3. ProcessingTime
    • 指定操作的机器的当前系统时间(每个算子都不一样)。
    • 不需要流和机器之间的协调。
    • 优势:最佳的性能和最低的延迟。
    • 弱点:不确定性,容易收到各种因素影响(envet产生的速度、到达Flink的速度、在算子之间的传输速度等),在处理上根本不考虑事件的顺序和延迟,所以(不推荐使用)。

总结:

在实际开发中,我们希望基于事件时间来处理数据,但是因为网络延迟等原因,出现了乱序或者延迟到达等问题,此种情况下处理的结果就不是我们想要的,甚至于出现数据丢失的情况,所以就需要一种机制来解决一定问题上的数据丢失或延迟到达的问题!因此,watermark应运而生(水印机制 / 水位线)!!!

6.2.2 Watermark

6.2.2.1 什么是Watermark

Watermaker就是给数据再额外的加的一个时间列。

也就是Watermaker是个时间戳 !

6.2.2.2 [Flink中的watermark详情](# 7.3.3 Flink中的WaterMark)

6.3 State

6.4 checkpoint

七、Flink流处理高阶编程【重点】

在上一个章节中,我们已经学习了Flink的基础编程API的使用,接下来,我们来学习Flink编程的高阶部分。所谓的高阶部分内容,其实就是Flink与其他计算框架不相同且占优势的地方,比如Window和Exactly-Once,接下来我们就对这些内容进行详细的学习。

7.1 Flink的window机制

7.1.1 窗口概述

? 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
? 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
? 在Flink中, 窗口(window)是处理无界流的核心。 窗口把流切割成有限大小的多个"存储桶"(***bucket)***, 我们在这些桶上进行计算。

7.1.2 窗口的分类

窗口分为2类:

  1. 基于时间的窗口(时间驱动)
    • 滚动窗口
    • 滑动窗口
    • 会话窗口
  2. 基于元素个数的(数据驱动)
    • 滚动窗口
    • 滑动窗口

7.1.2.1 基于时间的窗口

时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口, 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间差的方法(maxTimestamp())。

时间窗口又分4种 :

  1. 滚动窗口
  2. 滑动窗口
  3. 会话话窗
  4. 全局窗口
1. 滚动窗口(Tumbling Windows)

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙,比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口

API说明:

DataStream可以直接调用开窗的方法,但是都带“all”,这种情况下所有的数据不分组,都在窗口中。

所以在使用API时,应该尽可能的先对数据进行分组,分组后再调用开窗的API 。

**示例代码 一: **

public class Flink01_Window_TimeWindow_roll {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 获取数据源
        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        KeyedStream<Tuple2<String, Integer>, String> keyByDS = socketDS
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                })
                .keyBy(r -> r.f0);

        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS
            	//滚动窗口 =》 一个参数 :窗口长度
                .timeWindow(Time.seconds(5)); 
        		// 下面的方法是底层Process的API的实现
        		//.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        dataDS.sum(1).print("滚动窗口");

        env.execute();
    }
}

代码示例 二:

public class Flink01_Window_TimeTumbling {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = socketDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] datas = value.split(" ");
                return Tuple2.of(datas[0], 1);
            }
        });

        // 分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = mapDS.keyBy(data -> data.f0);

        // 开窗
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        // 聚合计算 sum算子
         SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowDS.sum(1);

        result.print();

        // 启动执行
        env.execute();
    }
}

说明:

  1. 时间间隔可以通过: Time.milliseconds(x),Time.seconds(x),Time.minutes(x),等等来指定.
  2. 我们传递给window函数的对象叫窗口分配器.
2. 滑动窗口(Sliding Windows)

与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。

特点:

一个数据最多可以属于多少个窗口? ==> (窗口长度 / 滑动步长)


**示例代码: **

用法和滚动窗口类似,只不过是滚动窗口需要一个参数,而滑动窗口需要两个参数

WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS
   //滑动窗口 => 两个参数 :第一个是 窗口长度 ; 第二个是 滑动步长
   .timeWindow(Time.seconds(5), Time.seconds(2)); 
   // 下面的方法是底层Process的API的实现
   //.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)));
3. 会话窗口(Session Windows)

会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。


**示例代码: **

  1. 静态gap
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)));
  1. 动态gap
.window(ProcessingTimeSessionWindows.withDynamicGap(new 	     		 	SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
    @Override
    public long extract(Tuple2<String, Long> element) {// 返回 gap值, 单位毫秒
        return element.f0.length()  1000;
    }
}))

创建原理:
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction

4. 全局窗口(Global Windows)

全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任务计算, 因为这种窗口没有能够处理聚集在一起元素的结束点。


**示例代码: **

.window(GlobalWindows.create());

7.1.2.2 基于元素个数的窗口

按照指定的数据条数生成一个Window,与时间无关。

计算窗口分2类 :

  1. 滚动窗口
  2. 滑动窗口
1. 滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

实例代码

.countWindow(3)

说明:那个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.

2. 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。

实例代码

.countWindow(3, 2)

7.1.3 Window Function

分为两类:

  1. 增量聚合函数
    • reduceFunction()函数
    • aggregateFunction()函数
    • sum()函数
  2. 全窗口函数
    • apply()函数
    • processWindowFunction()函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素。
window function 可以是ReduceFunction,AggregateFunction或者 ProcessWindowFunction中的任意一种。

  1. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合
  2. ProcessWindowFunction可以得到一个包含这个窗口中所有元素的迭代器及这些元素所属窗口的一些元数据信息
  3. ProcessWindowFunction不能被高效执行是因为Flink在执行这个函数之前, 需在内部缓存这个窗口上所有的元素。

ReduceFunction和AggregateFunction区别:

  • ReduceFunction不能够改变数据的类型。
  • AggregateFunction可以改变数据的类型(更灵活)。
1. ReduceFunction(增量聚合函数)
public class Flink01_Window_ReduceFunction {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        // 分组操作
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = socketDS
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                })
                .keyBy(data -> data.f0);
        // 开窗
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        // WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.timeWindow(Time.seconds(5));

        // 聚合操作 reduce
        dataDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
				System.out.println(value1 + "<---->" + value2);
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        }).print();

        env.execute();

    }
}
2. AggregateFunction(增量聚合函数)
public class Flink01_Window_AggregateFunction {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        // 分组操作
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = socketDS
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                })
                .keyBy(data -> data.f0);
        // 开窗
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        // WindowedStream<Tuple2<String, Integer>, String, TimeWindow> dataDS = keyByDS.timeWindow(Time.seconds(5));

        // 聚合操作 aggregate
        SingleOutputStreamOperator<Integer> agregateResult = dataDS.aggregate(new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() {

            // 创建累加器 —> 初始化 —>初始值为0
            @Override
            public Integer createAccumulator() {
                return 0;
            }

            // 累加操作 —> 当进来数据后,如何进行累加,定义累加规则
            @Override
            public Integer add(Tuple2<String, Integer> value, Integer acc) {
                System.out.println("add ... ");
                return acc + 1;
            }

            // 获取最终结果
            @Override
            public Integer getResult(Integer acc) {
                System.out.println("getResult ... ");
                return acc;
            }

            // 会话窗口 才会调用:合并累加器的结果
            @Override
            public Integer merge(Integer acc1, Integer acc2) {
                return acc1 + acc2;
            }
        });

        agregateResult.print();
        env.execute();
    }
}
3. ProcessWindowFunction(全窗口函数)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。

ProcessWindowFunction就是一个对整个窗口中数据处理的函数。

容易引起OOM异常

  /**
    * 全窗口函数:整个窗口的本组数据,存起来,关窗的时候一次性一起计算
    */
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>  () {
    // 参数1: key 
    // 参数2: 上下文对象 
    // 参数3: 这个窗口内所有的元素 
    // 参数4: 收集器, 用于向下游传递数据
    @Override
      public void process(
          	  String key,
              Context context,
              Iterable<Tuple2<String, Long>> elements,
              Collector<Tuple2<String, Long>> out) throws Exception{
        System.out.println(context.window().getStart())  ;
        long sum = 0L;
        for(Tuple2<String, Long> t : elements) {
             sum += t.  f1  ;
        }
        out.collect(Tuple2.of(key, sum));
    }
})  

7.2 Keyed vs Non-Keyed Windows

其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用.
在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个task都有自己单独窗口. 正如前面的代码所示。
在非non-keyed stream上使用窗口, 流的并行度只能是1, 所有的窗口逻辑只能在一个单独的task上执行。

.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))

需要注意的是: 非key分区的流, 即使把并行度设置为大于1 的数, 窗口也只能在某个分区上使用

7.3 Flik中的时间语义WaterMark

7.3.1 Flink中的时间语义

在Flink的流式操作中, 会涉及不同的时间概念,在Flink中使用时间语义用来处理乱序数据迟到数据

需要结合watermark水位线一起使用才有价值。

image-20210628210552068

7.3.1.1 事件时间(event time)

事件时间是指的这个事件发生的时间。例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

1.12 版本后默认的事件版本就是事件时间

在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在.
	在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关.  事件时间程序必须制定如何产生Event Time Watermarks(水印) 。在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟)。
	在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果。事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
	假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
	在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器
注意:
	在1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间

7.3.1.2 处理时间(process time)

处理时间是指的执行算子操作的各个设备的本地时间,与机器相关,1.12 前的版本默认的时间属性就是process time。

对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟.比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据.  假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据, 然后下个窗口是10:00am-11:00am, 等等
处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序
在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器

7.3.1.3 注入时间(Ingestion time)

是数据进入Flink的时间。

7.3.2 哪种时间更重要

![](https://img-blog.csdnimg.cn/img_convert/67dab82ff3b33858ef8974509731197f.png#align=left&display=inline&height=563&margin=[object Object]&originHeight=563&originWidth=1269&status=done&style=none&width=1269)
![](https://img-blog.csdnimg.cn/img_convert/f15a2992872f7a44fd30502669194a1e.png#align=left&display=inline&height=630&margin=[object Object]&originHeight=630&originWidth=1005&status=done&style=none&width=1005)

7.3.3 Flink中的WaterMark

处理迟到、乱序数据的方法:

  • 设置水位线
  • 设置窗口允许迟到数据
  • 侧输出流

概念:WaterMark 就是给数据额外的增加的一个时间列,其实就是一个时间戳,一个特殊的时间戳

获取:WaterMark = 数据的事件时间 - 最大的延迟时间 或 乱序时间

注意:通过源码可知,WaterMark = 当前窗口的最大事件时间 - 最大的延迟时间 或 乱序时间

这样可以保证WaterMark水位线会一直上升(变大),不会下降。

作用:

之前的窗口都是按照系统时间来触发计算的,如:[10:00:00 ~ 10:00:10)的窗口,一旦系统时间到了10:00:10就会触发计算,那么就有可能会导致延迟到达的数据丢失。
现在可以借助于waterMark水位线进行解决,窗口就可以按照waterMark来触发计算,也就是说waterMark就是用来触发计算的。

触发条件:

  1. 窗口中有数据
  2. waterMark >= 窗口的结束时间

因为前面说到

? Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间 或 乱序时间

? 也就是说只要不断有数据来,就可以保证Watermaker水位线是会一直上升/变大的,不会下降/减小的,所以最终一定是会触发窗口计算的。

上面的触发公式进行如下变形:

Watermaker >= 窗口的结束时间

Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间

当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间

当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间

WaterMark工作图解:

image-20210616191432107

image-20210616191508920

7.3.3.0 常见问题

  1. 怎么知道是乱序?怎么知道是迟到的数据?

    基于*事件时间*,可以得知数据是否是乱序以及数据是否是迟到时间。
    
  2. 已经知道了数据有乱序,做一个窗口的操作,用EventTime来触发窗口的关闭和计算,合不合适?

    不合适,因为数据有乱序,不使用EventTime来触发窗口的关闭和计算 ——> 所以需要另外的技术来衡量*时间的进展*WaterMark 
    	=> 表示 时间的进度
    	=> 用来触发窗口的 关闭、计算
    	=> 用来解决 数据乱序 的问题(等一会)
    	=> 特点:单调递增 的。
    	=> 源码:
        /* Watermark tells operators that no elements with a timestamp older or equal to the 	watermark timestamp should arrive at the operator*/
            public final class Watermark extends StreamElement {
            private final long timestamp;
            public Watermark(long timestamp) {
                this.timestamp = timestamp;
            }
        通过源码发现:watermark其实就是指的一个特殊的时间戳,通过注释知道,在这个时间戳之前的数据都已经到齐了。
    

    image-20210616102404024

  3. 怎么知道当前来的数据,属于哪个窗口?(也就是窗口是如何换分的)

    窗口划分 :TumblingEventTimeWindows 类的 assignWindows()方法
        => 窗口开始时间:timestamp - (timespamp + windowSize) % windowSize
        => 窗口结束时间:new TimeWindow(start,start + size) —> start + size
        => 窗口左闭右开:属于窗口额最大时间戳为:maxTimestamp = end -1 
    窗口触发条件:window.maxTimestamp() <= ctx.getCurrentWatermark()
    	=> 由watermark触发窗口的计算,当 watermark >= 窗口数据的最大时间 
    
  4. 乱序的场景下,怎么进行处理?

    计算由 watermark 触发。
    既然是乱序,就等待乱序后的数据都来后再进行计算。
    
  5. watermark设定了等待的时间,如果超过了等待的时间,还有数据没到齐,怎么处理?[7.4 窗口允许迟到的数据](# 7.4 窗口允许迟到的数据)

    窗口设置 —> 运行迟到 => allowedLateness
    
  6. 如果窗口设置了延迟时间,但是到了真正关窗的时间,后面还要属于这个窗口的数据来,怎么处理?

    放到 侧输出流 中存起来。
    
  7. 在多并行度的时候,怎么确定watermark的取值?

    以最小的为准,参考木桶原理。
    

7.3.3.1 有序流中的水印

AscendingTimestampExtractor

在下面的这个图中, 事件是有序的(按照他们自己的时间戳来看), watermark是流中一个简单的周期性的标记。
![](https://img-blog.csdnimg.cn/img_convert/a4705b06d0102ac7fb4f3a097f688fbb.png#align=left&display=inline&height=375&margin=[object Object]&originHeight=375&originWidth=1269&status=done&style=none&width=1269)

升序的简单入门案例:

public class Flink02_EventTime {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 1. 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<WaterSensor> dataDS = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                // TODO 2. 设置数据中的哪个字段(数据)可以当做事件时间【指定时间戳提取器】
                // AscendingTimestampExtractor : 数据的 事件时间 是 升序的。(升序的时间戳提取器)
                /*
                    升序的场景下,watermark就是当前的时间戳 - 1
                	public final Watermark getCurrentWatermark() {
                        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
                    }

                * */
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor<WaterSensor>() {
                    @Override
                    public long extractAscendingTimestamp(WaterSensor element) {
                        return element.getTs() * 1000L;
                    }
                });

        // 分组、开窗、聚合
        dataDS
                .keyBy(data -> data.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))// 事件时间
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 系统时间
                /*
                    泛型说明:
                        1 : 输入的数据的类型
                        2 : 输出的数据的类型
                        3 : 分组的key的类型
                        4 : 时间窗口对象
                 */
                .process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
                    /*
                        参数说明:
                        参数1 : 分组的key的类型
                        参数2 : 上下文对象
                        参数3 : 窗口内的所有元素
                        参数4 : 数据收集器 》用于向下游发送数据
                     */
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
                        out.collect(elements.spliterator().estimateSize());
                    }
                }).print();

        env.execute();
    }
}

7.3.3.2 乱序流中的水印

BoundedOutOfOrdernessTimestampExtractor

在下图中, 按照他们时间戳来看, 这些事件是乱序的, 则watermark对于这些乱序的流来说至关重要.
通常情况下, 水印是一种标记, 是流中的一个点, 所有在这个时间戳(水印中的时间戳)前的数据应该已经全部到达. 一旦水印到达了算子, 则这个算子会提高他内部的时钟的值为这个水印的值.
![](https://img-blog.csdnimg.cn/img_convert/5a09751dd764240933d862590166865f.png#align=left&display=inline&height=371&margin=[object Object]&originHeight=371&originWidth=1269&status=done&style=none&width=1269)

乱序的简单入门案例:

public class Flink03_OutofOrderness {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 1. 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<WaterSensor> dataDS = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                /*
                    官方提供了BoundedOutOfOrdernessTimestampExtractor抽象类用来 在乱序情况下提取 事件时间 和生成 watermark水位线
                    使用规则:
                        - 参数:最大乱序程度,是等待时间(5秒以内)
                        - 需要重写extractTimestamp()方法,如何从数据中抽取出事件时间
                    何为乱序:时间戳大的先到了Flink中。
                    假设数据是 1,2,3,4,5,6 秒生成的,开3s的滚动窗口 [0,3),[3,6),[6,9)
                    来的数据是 1,6,3,2,4,5 =》 最大乱序程度是 4s
                    等4s再关闭窗口 => [0,3),本应该再eventtime >= 3s时关窗,等待之后就是 7s后关窗 。

                    watermark表示时间进展、触发窗口的计算、关窗 —> 也就是watermark = 3s时,[0,3)关闭并计算
                    【watermark = EventTime = awaitTime = 7 - 4 = 3s。】
                    为了实现单调递增,上面公式的 EventTime ,应该是当前为止,最大的时间戳。

                 */
                .assignTimestampsAndWatermarks(
                        new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {
                            @Override
                            public long extractTimestamp(WaterSensor element) {
                                return element.getTs();
                            }
                        }
                );

        // 分组、开窗、聚合
        dataDS
                .keyBy(data -> data.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))// 事件时间
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 系统时间
                /*
                    泛型说明:
                        1 : 输入的数据的类型
                        2 : 输出的数据的类型
                        3 : 分组的key的类型
                        4 : 时间窗口对象
                 */
                .process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
                    /*
                        参数说明:
                        参数1 : 分组的key的类型
                        参数2 : 上下文对象
                        参数3 : 窗口内的所有元素
                        参数4 : 数据收集器 》用于向下游发送数据
                     */
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
                        out.collect(elements.spliterator().estimateSize());
                    }
                }).print();

        env.execute();
    }
}

7.3.4 Flink中如何产生水印

在 Flink 中, 水印(watermark水位线)由应用程序开发人员生成, 这通常需要对相应的领域有 一定的了解。完美的水印永远不会错:时间戳小于水印标记时间的事件不会再出现。在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水印。
启发式水印则相反,它只估计时间,因此有可能出错, 即迟到的事件 (其时间戳小于水印标记时间)晚于水印出现。针对启发式水印, Flink 提供了处理迟到元素的机制。
设定水印通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过 5 秒, 就可以将水印标记时间设为收到的最大时间戳减去 5 秒。 另 一种做法是,采用一个 Flink 作业监控事件流,学习事件的迟到规律,并以此构建水印生成模型。

7.3.5 EventTime和WaterMark的使用

Flink内置了两个WaterMark生成器:

  1. Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为0)

    WatermarkStrategy.forMonotonousTimestamps();

  2. Fixed Amount of Lateness(允许固定时间的延迟)

    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

package com.atheather.guigu.flink.java.chapter_7;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class Flink10_Chapter07_OrderedWaterMark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = env
          .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
          .map(new MapFunction<String, WaterSensor>() {
              @Override
              public WaterSensor map(String value) throws Exception {
                  String[] datas = value.split(",");
                  return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
              }
          });

        // 创建水印生产策略
        WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
          .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // // 最大容忍的延迟时间
          .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
              @Override 
              public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                  return element.getTs() * 1000;
              }
          });

        stream
          .assignTimestampsAndWatermarks(wms) // 指定水印和时间戳
          .keyBy(WaterSensor: :getId)
          .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
              @Override
              public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                  String msg = "当前key: " + key
                    + "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd()/1000 + ") 一共有 "
                    + elements.spliterator().estimateSize() + "条数据 ";
                  out.collect(msg);
              }
          })
          .print();
        env.execute();
    }
}

7.3.6 自定义WatermarkStrategy

有2种风格的WaterMark生产方式: periodic(周期性) and punctuated(间歇性).都需要继承接口: WatermarkGenerator

7.3.6.1 周期性

package com.heather.flink.java.chapter_7;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class Flink11_Chapter07_Period {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> stream = env
          .socketTextStream("localhost", 9999)  // 在socket终端只输入毫秒级别的时间戳
          .map(new MapFunction<String, WaterSensor>() {
              @Override
              public WaterSensor map(String value) throws Exception {
                  String[] datas = value.split(",");
                  return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

              }
          });

        // 创建水印生产策略
        WatermarkStrategy<WaterSensor> myWms = new WatermarkStrategy<WaterSensor>() {
            @Override
            public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                System.out.println("createWatermarkGenerator ....");
                return new MyPeriod(3);
            }
        }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                System.out.println("recordTimestamp  " + recordTimestamp);
                return element.getTs() * 1000;
            }
        });

        stream
          .assignTimestampsAndWatermarks(myWms)
          .keyBy(WaterSensor::getId)
          .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(5)))
          .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
              @Override
              public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                  String msg = "当前key: " + key
                    + "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
                    + elements.spliterator().estimateSize() + "条数据 ";
                  out.collect(context.window().toString());
                  out.collect(msg);
              }
          })
          .print();
        env.execute();
    }

    public static class MyPeriod implements WatermarkGenerator<WaterSensor> {

        private long maxTs = Long.MIN_VALUE;
        // 允许的最大延迟时间 ms
        private final long maxDelay;

        public MyPeriod(long maxDelay) {
            this.maxDelay = maxDelay * 1000;
		this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
        }

        // 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("onEvent..." + eventTimestamp);
            //有了新的元素找到最大的时间戳
            maxTs = Math.max(maxTs, eventTimestamp);
            System.out.println(maxTs);
        }

        // 周期性的把WaterMark发射出去, 默认周期是200ms
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {

//            System.out.println("onPeriodicEmit...");
            // 周期性的发射水印: 相当于Flink把自己的时钟调慢了一个最大延迟
            output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
        }
    }
}

7.3.6.2 间歇性

public class Flink12_Chapter07_punctuated {
    public static void main(String[] args) throws Exception {
    // 省略....

    public static class MyPunctuated implements WatermarkGenerator<WaterSensor> {
        private long maxTs;
        // 允许的最大延迟时间 ms
        private final long maxDelay;

        public MyPunctuated(long maxDelay) {
            this.maxDelay = maxDelay * 1000;
		this.maxTs = Long.MIN_VALUE + this.maxDelay + 1;
        }

        // 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("onEvent..." + eventTimestamp);
            //有了新的元素找到最大的时间戳
            maxTs = Math.max(maxTs, eventTimestamp);
            output.emitWatermark(new Watermark(maxTs - maxDelay - 1));
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 不需要实现
        }
    }
}

7.3.7 多并行度下WaterMark的传递

总结: 多并行度的条件下, 向下游传递WaterMark的时候, 总是以最小的那个WaterMark为准! 木桶原理!

7.4 窗口允许迟到的数据

已经添加了wartemark之后, 仍有数据会迟到怎么办?Flink的窗口, 也允许迟到数据。

  1. 当waterMark >= 窗口的结束时间的时候,会正常的触发计算,但是不会关闭窗口。
  2. 当waterMark >= 窗口的结束时间 + 窗口的等待时间,会真正的关闭窗口。
  3. 当 窗口的结束时间 <= waterMark <= 窗口结束时间 + 窗口等待时间,每来一条迟到数据,就会计算一次。

注意:
允许迟到 只能运用在event time上

图解

image-20210617115835558

public class Flink05_Watermark_AllowedLateness {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> waterSensorDS = socketDS
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                // 设置数据中的哪个字段(数据)可以当做事件时间【指定时间戳提取器】
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(WaterSensor element) {
                        return element.getTs() * 1000L;
                    }
                });

        // 分组、开窗、聚合
        waterSensorDS
                .keyBy(data -> data.getId())
                .timeWindow(Time.seconds(5))
                .allowedLateness(Time.seconds(2))
                // 全窗口函数:整个窗口的本组数据,存起来,关窗的时候一次性一起计算
                .process(new ProcessWindowFunction<WaterSensor, Long, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<Long> out) throws Exception {
                        out.collect(elements.spliterator().estimateSize());
                    }
                })
                .print();

        env.execute();

    }
}

7.5 侧输出流(sideOutput)

7.5.1 处理窗口关闭之后的迟到数据

允许迟到数据,,窗口也会真正的关闭, 如果还有迟到的数据怎么办? Flink提供了一种叫做侧输出流的来处理关窗之后到达的数据。

作用:

  1. 处理乱序数据、迟到数据
  2. 对不同的数据进行分流
public class Flink07_ProcessFunction_SideOutput {

    public static void main(String[] args) throws Exception {
        // 0 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // TODO 1.env指定时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 1.
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

                    }
                })
                .assignTimestampsAndWatermarks(
                        new AssignerWithPunctuatedWatermarks<WaterSensor>() {
                            private Long maxTs = Long.MIN_VALUE;

                            @Nullable
                            @Override
                            public Watermark checkAndGetNextWatermark(WaterSensor lastElement, long extractedTimestamp) {
                                maxTs = Math.max(maxTs, extractedTimestamp);
                                return new Watermark(maxTs);
                            }

                            @Override
                            public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
                                return element.getTs() * 1000L;
                            }
                        }
                );

        //TODO 使用侧输出流
        // 1.定义一个OutputTag,给定一个 名称
        // 2.使用 ctx.output(outputTag对象,放入侧输出流的数据)
        // 3.获取侧输出流 => DataStream.getSideOutput(outputTag对象)
        OutputTag<String> outputTag = new OutputTag<String>("vc alarm") {
        };

        SingleOutputStreamOperator<WaterSensor> processDS = sensorDS
                .keyBy(data -> data.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {

                            /**
                             * 来一条数据,处理一条
                             * @param value
                             * @param ctx
                             * @param out
                             * @throws Exception
                             */
                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                                if (value.getVc() > 5) {
                                   /*
                                        TODO 侧输出流 水位高于阈值,用侧输出流告警
                                    */
                                    ctx.output(outputTag, "水位高于阈值5!!!");
                                }
                                out.collect(value);
                            }
                        }
                );

        processDS.print();
        /*
            TODO 侧输出流只保存数据,不对数据进行计算,在此处,我们调用侧输出流中的数据 ===> 进行计算
         */
        processDS.getSideOutput(outputTag).print("alarm");

        env.execute();
    }
}

7.5.2 使用侧输出流把一个流拆成多个流

split算子可以把一个流分成两个流, 从1.12开始已经被移除了. 官方建议我们用侧输出流来替换split算子的功能。

**需求: **

? 采集监控传感器水位值,将水位值高于5cm的值输出到side output

SingleOutputStreamOperator<WaterSensor> result =
  env
    .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
    .map(new MapFunction<String, WaterSensor>() {
        @Override
        public WaterSensor map(String value) throws Exception {
            String[] datas = value.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

        }
    })
    .keyBy(ws -> ws.getTs())
    .process(new KeyedProcessFunction<Long, WaterSensor, WaterSensor>() {
        @Override
        public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
            out.collect(value);
            if (value.getVc() > 5) { //水位大于5的写入到侧输出流
                ctx.output(new OutputTag<WaterSensor>("警告") {}, value);
            }
        }
    });

result.print("主流");
result.getSideOutput(new OutputTag<WaterSensor>("警告"){}).print("警告");

7.6 ProcessFunction API(底层API)

我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。

Flink提供了8个Process Function:

? ProcessFunction
? KeyedProcessFunction : keyBy分组后使用
? CoProcessFunction : connect 连接后使用
? ProcessJoinFunction
? BroadcastProcessFunction
? KeyedBroadcastProcessFunction
? ProcessWindowFunction : window开窗后使用
? ProcessAllWindowFunction

7.6.1 ProcessFunction

env
  .socketTextStream("hadoop102", 9999)
  .map(line -> {
      String[] datas = line.split(",");
      return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
  })
  .process(new ProcessFunction<WaterSensor, String>() {
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
          out.collect(value.toString());
      }
  })
  .print();

7.6.2 KeyedProcessFunction

env
  .socketTextStream("hadoop102", 9999)
  .map(line -> {
      String[] datas = line.split(",");
      return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
  })
  .keyBy(ws -> ws.getId())
  .process(new KeyedProcessFunction<String, WaterSensor, String>() { // 泛型1:key的类型 泛型2:输入类型 泛型3:输出类型
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
          System.out.println(ctx.getCurrentKey());
          out.collect(value.toString());
      }
  })
  .print();

7.6.3 CoProcessFunction

DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");

ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs
  .process(new CoProcessFunction<Integer, String, String>() {
      @Override
      public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
          out.collect(value.toString());
      }

      @Override
      public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
          out.collect(value);
      }
  })
  .print();

7.6.4 ProcessJoinFunction

SingleOutputStreamOperator<WaterSensor> s1 = env
  .socketTextStream("hadoop102", 8888)  // 在socket终端只输入毫秒级别的时间戳
  .map(value -> {
      String[] datas = value.split(",");
      return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

  });
SingleOutputStreamOperator<WaterSensor> s2 = env
  .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  .map(value -> {
      String[] datas = value.split(",");
      return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

  });

s1.join(s2)
  .where(WaterSensor::getId)
  .equalTo(WaterSensor::getId)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 必须使用窗口
  .apply(new JoinFunction<WaterSensor, WaterSensor, String>() {
      @Override
      public String join(WaterSensor first, WaterSensor second) throws Exception {
          return "first: " + first + ", second: " + second;
      }
  })
  .print();

7.6.5 BroadcastProcessFunction

后面专门讲解

7.6.6 KeyedBroadcastProcessFunction

keyBy之后使用

7.6.7 ProcessWindowFunction

添加窗口之后使用

7.6.8 ProcessAllWindowFunction

全窗口函数之后使用

7.7 定时器

基于处理时间或者时间时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行

Context和OnTimerContext所持有的TimerService对象拥有以下方法:

  • currentProcessingTime(): Long 返回当前处理时间
  • currentWatermark(): Long 返回当前watermark的时间戳
  • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
  • registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
  • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

7.7.1 定时器(处理时间&事件时间)

public class Flink04_ProcessFunc_KeyBy {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 指定时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> waterSensorDS = socketDS
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                })
                .assignTimestampsAndWatermarks(
//                        new AscendingTimestampExtractor<WaterSensor>() {
//                            @Override
//                            public long extractAscendingTimestamp(WaterSensor element) {
//                                return element.getTs() * 1000L;
//                            }
//                        }
                        // 自定义周期型水位线
                        new AssignerWithPunctuatedWatermarks<WaterSensor>() {
                            private Long maxTs = Long.MIN_VALUE;

                            @Nullable
                            @Override
                            public Watermark checkAndGetNextWatermark(WaterSensor lastElement, long extractedTimestamp) {
                                maxTs = Math.max(maxTs, extractedTimestamp);
                                return new Watermark(maxTs);
                            }

                            @Override
                            public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
                                return element.getTs() * 1000L;
                            }
                        }
                );

        SingleOutputStreamOperator<Long> result = waterSensorDS
                .keyBy(r -> r.getId())
                .process(new KeyedProcessFunction<String, WaterSensor, Long>() {

                    private Long triggerTS = 0L;

                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<Long> out) throws Exception {
                        // 获取当前数据的分组
                        ctx.getCurrentKey();
                        // 获取当前数据的时间戳
                        ctx.timestamp();
                        // 将数据放入侧输出流
                        //ctx.output();
                        // 定时器 : 注册 、删除
                        //ctx.timerService().registerEventTimeTimer();
                        //ctx.timerService().registerProcessingTimeTimer(

                        // 为了避免重复注册、重复创建对象,注册定时器的时候,判断一下是否已经注册过了
                        if(triggerTS == 0L){
                            ctx.timerService().registerEventTimeTimer(
                                    // ctx.timerService().currentProcessingTime() + 5000L //处理时间
                                    value.getTs() * 1000L + 5000L                       // 事件时间
                            );
                            triggerTS = value.getTs() * 1000L + 5000L;
                        }
                    }

                    /**
                     * 定时器到时候,触发行为
                     * @param timestamp  注册的定时器的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
                        // System.out.println(new Timestamp(timestamp) + "定时器触发。。。"); // 处理时间
                        System.out.println(timestamp + "定时器触发");                       // 事件时间
                    }
                });

        result.print();
        env.execute();
    }
}

7.7.2 定时器练习

需求 :
监控水位传感器的水位值,如果水位值在五秒之内(processing time)连续上升,则报警。

public class Flink06_ProcessFunction_TimerPractice {

    public static void main(String[] args) throws Exception {
        // 0 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // TODO 1.env指定时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 1.
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

                    }
                })
                .assignTimestampsAndWatermarks(
                        new AssignerWithPunctuatedWatermarks<WaterSensor>() {
                                private Long maxTs = Long.MIN_VALUE;

                                @Nullable
                                @Override
                                public Watermark checkAndGetNextWatermark(WaterSensor lastElement, long extractedTimestamp) {
                                    maxTs = Math.max(maxTs, extractedTimestamp);
                                    return new Watermark(maxTs);
                                }

                                @Override
                                public long extractTimestamp(WaterSensor element, long previousElementTimestamp) {
                                return element.getTs() * 1000L;
                            }
                        }
                );


        SingleOutputStreamOperator<String> processDS = sensorDS
                .keyBy(data -> data.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            // 定义一个变量,保存上一次的水位值
                            private Integer lastVC = 0;
                            private Long triggerTs = 0L;

                            /**
                             * 来一条数据,处理一条
                             * @param value
                             * @param ctx
                             * @param out
                             * @throws Exception
                             */
                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 判断是上升还是下降
                                if (value.getVc() > lastVC) {
                                    // 1.水位上升
                                    if (triggerTs == 0) {
                                        // 第一条数据来的时候,注册定时器
                                        ctx.timerService().registerEventTimeTimer(value.getTs() * 1000L + 5000L);
                                        triggerTs = value.getTs() * 1000L + 5000L;
                                    }
                                } else {
                                    // 2.水位下降
                                    // 2.1 删除注册的定时器
                                    ctx.timerService().deleteEventTimeTimer(triggerTs);
                                    // 2.2 重新注册定时器(或 把保存的时间清空)
                                    triggerTs = 0L;
                                }

                                // 不管上升还是下降,都要保存水位值,供下条数据使用,进行比较
                                lastVC = value.getVc();
                            }

                            /**
                             * 定时器触发
                             * @param timestamp 注册的定时器的时间
                             * @param ctx   上下文
                             * @param out   采集器
                             * @throws Exception
                             */
                            @Override
                            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                                // 定时器触发,说明已经满足 连续5s 水位上升
                                out.collect(ctx.getCurrentKey() + "在" + timestamp + "监测到水位连续5s上升");
                                // 将保存的注册时间清空
                                triggerTs = 0L;
                            }
                        }
                );


        processDS.print();

        env.execute();
    }
}

7.8 Flink状态编程

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
Flink的状态管理是它的优势之一。状态就是保存的上一次的老数据

各个类型的状态的使用范围:

算子状态:Source 端和 Sink端。

键控状态:keyBy之后使用。

7.8.1 什么是状态

在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。
那些需要记住多个事件信息的操作就是有状态的。

  • 在Flink中,状态始终是与特定的算子相关联的。

  • 为了使运行时的Flink了解算子的状态,算子需要预先注册其状态。

总的来说,在Flink中,状态分为两种:

  • 算子状态(Operator State)

    算子状态的作用范围限定为算子任务。

  • 键控状态

    根据输入数据流中定义的键(key)来维护和访问。

流式计算分为 无状态计算 和 有状态计算 两种情况。

  • 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。
  • 有状态的计算则会基于多个事件输出结果。以下是一些例子。例如,计算过去一小时的平均水位,就是有状态的计算。所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。

7.8.2 为什么需要管理状态

下面的几个场景都需要使用流处理的状态功能 :

  • 去重

    数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。

  • 检测

    检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。

  • 聚合

    对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况。

  • 更新机器学习模型

    在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

7.8.3 Flink中的状态分类

Flink包括两种基本类型的状态Managed State和Raw State :

Managed StateRaw State
状态管理方式Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩用户自己管理
状态数据结构Flink提供多种常用数据结构, 例如:ListState, MapState等字节数组: byte[]
使用场景绝大数Flink算子所有算子

注意:

  1. 从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
  2. 在我们平时的使用中Managed State已经足够我们使用, 下面重点学习Managed State

7.8.4 Managed State的分类

对Managed State继续细分,它又有两种类型

  1. Operator State(算子状态)。
  2. Keyed State(键控状态)。
Operator StateKeyed State
适用用算子类型可用于所有算子: 常用于source, 例如 FlinkKafkaConsumer只适用于KeyedStream上的算子
状态分配一个算子的子任务对应一个状态一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式实现CheckpointedFunction或ListCheckpointed(已经过时)接口重写RichFunction, 通过里面的RuntimeContext访问
横向扩展并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量并发改变, State随着Key在实例间迁移
支持的数据结构ListState和BroadCastStateValueState, ListState,MapState ReduceState, AggregatingState

7.8.5 算子状态的使用

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。


注意: 算子子任务之间的状态不能互相访问

Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

Flink为算子状态提供三种基本数据结构:

数据结构特点
列表状态(List state)将状态表示为一组数据的列表
联合列表状态(Union list state)也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
广播状态(Broadcast state)是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

7.8.5.1 案例1: 列表状态

在map算子中计算数据的个数

public class Flink01_OperatorState {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
            String[] datas = lines.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

        // 定义一个有状态的map操作,统计当前分区数据的个数
        SingleOutputStreamOperator<Integer> countDS = mapDS.map(new MyCountMapper());

        countDS.print();
        env.execute();
    }

    // 自定义MapFunction
    // 我们需要把状态保存在检查点中,所以需要实现ListCheckpointed接口
    private static class MyCountMapper implements MapFunction<WaterSensor,Integer>, ListCheckpointed<Integer> {
        // 定义一个本地变量,作为算子状态
        private Integer count = 0;

        @Override
        public Integer map(WaterSensor value) throws Exception {
            count ++;
            return count;
        }

        // 把状态保存到快照中【存储到集合中】
        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }

        // 故障恢复方法
        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for (Integer num : state) {
                count +=num;
            }
        }
    }
}

7.8.5.2 案例2: 广播状态

从版本1.5.0开始,Apache Flink具有一种新的状态,称为广播状态
广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。考虑到上述类型的用例,广播状态与其他操作符状态的区别在于:

  1. 它是一个map格式。
  2. 它只对输入有广播流和无广播流的特定操作符可用。
  3. 这样的操作符可以具有具有不同名称的多个广播状态。

7.8.6 键控状态的使用

键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

7.8.6.1键控状态支持的数据类型

数据类型特点
值状态(Value State)将状态表示为单个的值
列表状态(List State)将状态表示为一组数据的列表
映射状态(Map State<UK, UV>)将状态表示为一组key-value对
聚合状态(ReducingState & AggregatingState)将状态表示为一个用于聚合操作的列表

具体描述如下:

  • ValueState

保存单个值. 每个有key有一个状态值. 设置使用 update(T), 获取使用 T value()

  • ListState:

保存元素列表.
添加元素: add(T) addAll(List)
获取元素: Iterable get()
覆盖所有元素: update(List)

  • ReducingState:

存储单个值, 表示把所有元素的聚合结果添加到状态中. 与ListState类似, 但是当使用add(T)的时候ReducingState会使用指定的ReduceFunction进行聚合.

  • AggregatingState<IN, OUT>:

存储单个值. 与ReducingState类似, 都是进行聚合. 不同的是, AggregatingState的聚合的结果和元素类型可以不一样.

  • MapState<UK, UV>:

存储键值对列表.
添加键值对: put(UK, UV) or putAll(Map<UK, UV>)
根据key获取值: get(UK)
获取所有: entries(), keys() and values()
检测是否为空: isEmpty()
注意:

  • 所有的类型都有clear(), 清空当前key的状态
  • 这些状态对象仅用于用户与状态进行交互.
  • 状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
  • 从状态获取的值与输入元素的key相关
基本操作代码
public class Flink02_KeyedState {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
            String[] datas = lines.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

       // 按照ID分组
        KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(WaterSensor::getId);

        // 演示状态的使用
        SingleOutputStreamOperator<WaterSensor> process = keyedDS.process(new MyKeyedProcessFunction());
        // 打印
        process.print();

        // 启动执行
        env.execute();
    }


    private static class MyKeyedProcessFunction extends KeyedProcessFunction<String, WaterSensor, WaterSensor> {
        // 1. 定义状态
        private ValueState<Long> valueState;
        private ListState<Long> listState;
        private MapState<String,Long> mapState;
        private ReducingState<WaterSensor> reducerState;
        private AggregatingState<WaterSensor,WaterSensor> aggregatingState;

        // 2. 初始化
        @Override
        public void open(Configuration parameters) throws Exception {
            valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state", Long.class));
            listState = getRuntimeContext().getListState(new ListStateDescriptor<Long>("list-state", Long.class));
            mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("map-state", String.class, Long.class));
            // reducerState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<WaterSensor>("reduce-state", , WaterSensor.class));
        }

        /*
            3. 状态的操作
                 - 增
                 - 删
                 - 改
                 - 查
         */

        @Override
        public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
            // 3.1 value状态
            Long value1 = valueState.value();// 获取状态中的值
            valueState.update(122L); // 更新状态的值
            valueState.clear();             // 清空状态值到原始状态

            // 3.2 listState状态
            Iterable<Long> longs = listState.get();  // 获取状态中的值
            listState.add(122L);                    // 添加状态值
            listState.update(new ArrayList<>());   // 替换整个的状态的值
            listState.clear();                    // 清空状态值到原始状态

            // 3.3 mapState状态
            Iterator<Map.Entry<String, Long>> it = mapState.iterator();  // 获取所有的状态【然后迭代器进行迭代】
            Long aLong = mapState.get("");                              // 获取指定的某个key的状态
            // ....

            // 3.4 reduceState状态
            WaterSensor waterSensor = reducerState.get();   // 获取状态中的值
            reducerState.add(new WaterSensor());           // 添加状态值
            reducerState.clear();                         // 清空状态值到原始状态

            // 3.5 aggregateState状态
            WaterSensor waterSensor1 = aggregatingState.get();// 获取状态中的值
            aggregatingState.add(new WaterSensor());          // 添加状态值
            aggregatingState.clear();                         // 清空状态值到原始状态

        }
    }
}

7.8.6.2 案例1:ValueState

检测传感器的水位线值,如果连续的两个水位线差值超过10,就输出报警。

public class Flink03_State_ValueState {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
            String[] datas = lines.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

        // 按照ID分组
        KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(r -> r.getId());

        // 使用RichFunction实现水位线跳变报警需求
        SingleOutputStreamOperator<String> stateDS = keyedDS.flatMap(new RichFlatMapFunction<WaterSensor, String>() {
            // 定义状态
            private ValueState<Integer> vcState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化
                vcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("vc-state", Integer.class));
            }

            @Override
            public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                // 取出状态中的值
                Integer lastVc = vcState.value();

                // 更新状态值
                vcState.update(value.getVc());

                // 当上一次的水位线不为NULL,并且出现水位跳变的时候进行报警
                if (lastVc != null && Math.abs(lastVc - value.getVc()) > 10) {
                    out.collect("报警!!!");
                }
            }
        });
        stateDS.print();

        // 打印
        // 启动执行
        env.execute();
    }
}

7.8.6.3 案例2:ListState

针对每个传感器输出最高的3个水位值

public class Flink04_State_ListState {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
            String[] datas = lines.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

        // 按照ID分组
        KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(r -> r.getId());

        keyedDS.map(new RichMapFunction<WaterSensor, List<WaterSensor>>() {

            // 定义状态
            private ListState<WaterSensor> top3State;

            // 初始化
            @Override
            public void open(Configuration parameters) throws Exception {
                top3State = getRuntimeContext().getListState(new ListStateDescriptor<WaterSensor>("list-state", WaterSensor.class));
            }

            @Override
            public List<WaterSensor> map(WaterSensor value) throws Exception {
                // 将当前数据加入状态
                top3State.add(value);

                // 取出状态中的数据并且排序【倒序】
                ArrayList<WaterSensor> waterSensors = Lists.newArrayList(top3State.get().iterator());
                waterSensors.sort((o1,o2) -> o2.getVc() - o1.getVc());

                // 判断档期数据是否超过3条,如果超过,则删除最后一条
                if(waterSensors.size() > 3){
                    waterSensors.remove(3);
                }

                // 更新状态
                top3State.update(waterSensors);

                // 返回数据
                return waterSensors;
            }
        }).print();

        // 打印
        // 启动执行
        env.execute();
    }
}

7.8.6.4 案例3:ReducingState

计算每个传感器的水位之和(分组)

public class Flink05_State_ReducingState {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<WaterSensor> mapDS = socketDS.map(lines -> {
            String[] datas = lines.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        });

        // 按照ID分组
        KeyedStream<WaterSensor, String> keyedDS = mapDS.keyBy(r -> r.getId());

        // 逻辑处理
        keyedDS.process(new KeyedProcessFunction<String,WaterSensor,WaterSensor>() {
            // 定义状态
            private ReducingState<WaterSensor> reducingState;
            // 初始化
            @Override
            public void open(Configuration parameters) throws Exception {
                reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<WaterSensor>(
                        "reducing-state", new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
                    }
                }, WaterSensor.class));
            }

            //
            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {

                // 将当前数据聚合到状态中
                reducingState.add(value);

                // 取出状态中的数据
                WaterSensor waterSensor = reducingState.get();

                // 输出数据
                out.collect(waterSensor);
            }
        }).print();


        // 打印
        // 启动执行
        env.execute();
    }
}

7.8.6.5 案例4:AggregatingState

计算每个传感器的平均水位

.process(new KeyedProcessFunction<String, WaterSensor, Double>() {

    private AggregatingState<Integer, Double> avgState;

    @Override
    public void open(Configuration parameters) throws Exception {
        AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> aggregatingStateDescriptor = new AggregatingStateDescriptor<>("avgState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
            @Override
            public Tuple2<Integer, Integer> createAccumulator() {
                return Tuple2.of(0, 0);
            }

            @Override
            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
            }

            @Override
            public Double getResult(Tuple2<Integer, Integer> accumulator) {
                return accumulator.f0 * 1D / accumulator.f1;
            }

            @Override
            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
            }
        }, Types.TUPLE(Types.INT, Types.INT));
        avgState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
    }

    @Override
    public void processElement(WaterSensor value, Context ctx, Collector<Double> out) throws Exception {
        avgState.add(value.getVc());
        out.collect(avgState.get());
    }
})

7.8.6.6 案例5:MapState

去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意

.process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
    private MapState<Integer, String> mapState;
    @Override
    public void open(Configuration parameters) throws Exception {
        mapState = this
          .getRuntimeContext()
          .getMapState(new MapStateDescriptor<Integer, String>("mapState", Integer.class, String.class));
    }
    @Override
    public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
        if (!mapState.contains(value.getVc())) {
            out.collect(value);
            mapState.put(value.getVc(), "随意");
        }
    }
})

7.8.7 状态后端

每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:

  • 本地的状态管理
  • 将检查点(checkpoint)状态写入远程存储

7.8.7.1 状态后端的分类

将状态保存到何处,配置状态保存的位置。结合checkPoint检查点一起使用。

状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
Flink提供了3中状态后端:

  1. MemoryStateBackend ——>
  2. FsStateBackend
  3. RocksDBStateBackend
1. MemoryStateBackend
内存级别的状态后端, 
存储方式:本地状态存储在JobManager的内存中, checkpoint 存储在JobManager的内存中。
特点:快速, 低延迟, 但不稳定
使用场景: 
		1. 本地测试 
		2. 几乎无状态的作业(ETL) 
		3. JobManager不容易挂, 或者挂了影响不大. 
		4. 不推荐在生产环境下使用
2. FsStateBackend
存储方式: 本地状态在JobManager内存, Checkpoint存储在文件系统中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景: 
		1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 
		2. 需要开启HA的作业 	
		3. 可以应用在生产环境中
3. RocksDBStateBackend
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式:
		1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 
		2. Checkpoint在外部文件系统中.
使用场景: 
		1. 超大状态的作业, 例如天级的窗口聚合 
		2. 需要开启HA的作业 
		3. 对读写状态性能要求不高的作业 
		4. 可以使用在生产环境

7.8.7.2 配置状态后端

1. 全局配置状态后端

在flink-conf.yaml文件中设置默认的全局后端
![](https://img-blog.csdnimg.cn/img_convert/ba3e7fed930899b43e1ceb09a60c4447.png#align=left&display=inline&height=189&margin=[object Object]&originHeight=189&originWidth=1269&status=done&style=none&width=1269)

2. 在代码中配置状态后端

可以在代码中单独为这个Job设置状态后端;

如何要使用RocksDBBackend, 需要先引入依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
// 定义状态后端,保存状态的位置
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));

// 开启CheckPoint
env.getCheckpointConfig().enableUnalignedCheckpoints();

7.9 Flink的容错机制

7.9.1 状态的一致性

当在分布式系统中引入状态时,自然也引入了一致性问题。
一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?

7.9.1.1 一致性级别

在流处理中,一致性可以分为3个级别:

  • **at-most-once(最多变一次): **

    这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。

  • **at-least-once(至少一次): **

    这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

  • exactly-once(严格变一次):

    这指的是系统保证在发生故障后得到的计数结果与正确值一致.既不多算也不少算。
    曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二:

  1. 保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性

  2. 流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。

    最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。
    Flink的一个重大价值在于,它保证了exactly-once具有低延迟和高吞吐的处理能力。
    从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。

7.9.1.2 端到端的状态一致性

? 目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
? 端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。

具体划分如下:

image-20210619005557778

1. source端

需要外部源可重设数据的读取位置.
目前我们使用的Kafka Source具有这种特性: 读取数据的时候可以指定offset

2. flink内部

依赖checkpoint机制

3. sink端

需要保证从故障恢复时,数据不会重复写入外部系统或丢失数据。

有2种实现形式:

  1. 幂等(Idempotent)写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
  1. 事务性(Transactional)写入

需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)

image-20210619002927125

IMG_256

7.9.2 Checkpoint原理

概念:Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。

理解:相当于是状态的备份,用来存储状态的终端。

假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。

![](https://img-blog.csdnimg.cn/img_convert/6570c6aebd73a5e7bee4e76c3c465964.png#align=left&display=inline&height=517&margin=[object Object]&originHeight=517&originWidth=760&status=done&style=none&width=760)
于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。

7.9.2.1 Flink的检查点算法

checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.
快照的实现算法:

  1. 简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用

  2. Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous barrier snapshotting)

    每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

7.9.2.2 理解Barrier

流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.
每个barrier会把数据流分成两部分: 一部分数据进入**当前的快照** , 另一部分数据进入**下一个快照 ****.** 每个barrier携带着快照的id. barrier 不会暂停数据的流动, 所以非常轻量级.  在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.

![](https://img-blog.csdnimg.cn/img_convert/88ec309466654b8b8ff8b9bad5dfadf6.png#align=left&display=inline&height=402&margin=[object Object]&originHeight=402&originWidth=1048&status=done&style=none&width=1048)

7.9.2.3 Flink的检查点制作过程

  • 第一步: Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后Source Task会在数据流中安插CheckPoint barrier

![](https://img-blog.csdnimg.cn/img_convert/87c4ad4bdaa70d8dab6cbfbcda54a42c.png#align=left&display=inline&height=567&margin=[object Object]&originHeight=567&originWidth=1761&status=done&style=none&width=1761)

  • 第二步: source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint![](https://img-blog.csdnimg.cn/img_convert/a8120e50a03d165811649f95b18c2b33.png#align=left&display=inline&height=619&margin=[object Object]&originHeight=619&originWidth=1613&status=done&style=none&width=1613)
  • 第三步: 当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。

![](https://img-blog.csdnimg.cn/img_convert/9d1e199592be3f5afbd7ed4f07263579.png#align=left&display=inline&height=635&margin=[object Object]&originHeight=635&originWidth=1574&status=done&style=none&width=1574)

  • 第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。

![](https://img-blog.csdnimg.cn/img_convert/db5acd4f9aa5160a9c99843111ee58ab.png#align=left&display=inline&height=659&margin=[object Object]&originHeight=659&originWidth=1516&status=done&style=none&width=1516)

  • 第五步: 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。

![](https://img-blog.csdnimg.cn/img_convert/6f4050ba2efbea2358d4e38ea1d7bace.png#align=left&display=inline&height=664&margin=[object Object]&originHeight=664&originWidth=1504&status=done&style=none&width=1504)

  • 第六步: 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

![](https://img-blog.csdnimg.cn/img_convert/def4c56ecf3794f1588454188eb9cb8e.png#align=left&display=inline&height=662&margin=[object Object]&originHeight=662&originWidth=1509&status=done&style=none&width=1509)

7.9.2.4 严格一次语义: barrier对齐

在多并行度下, 如果要实现严格一次, 则要执行**barrier对齐**。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行** barrier 对齐**(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

https://ci.apache.org/projects/flink/flink-docs-release-1.12/fig/stream_aligning.svg
![](https://img-blog.csdnimg.cn/img_convert/f408e2d97ceafabb980ba6b008cdeb2b.png#align=left&display=inline&height=282&margin=[object Object]&originHeight=282&originWidth=1269&status=done&style=none&width=1269)

  1. 当operator收到数字流的barrier n时, 它就**不能处理(但是可以接收)**来自该流的任何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n + 1 的记录。
  2. 接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理。
  3. 图一中的 Checkpoint barrier n之后的数据 123已结到达了算子, 存入到输入缓冲区没有被处理, 只有等到字母流的Checkpoint barrier n到达之后才会开始处理.
  4. 一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。

7.9.2.5 至少一次语义: barrier不对齐

前面介绍了barrier对齐, 如果barrier不对齐会怎么样?  会重复消费, 就是**至少一次**语义.

![](https://img-blog.csdnimg.cn/img_convert/9f169abfcd2b5d1db0349842601b2172.png#align=left&display=inline&height=282&margin=[object Object]&originHeight=282&originWidth=1269&status=done&style=none&width=1269)
假设不对齐, 在字母流的Checkpoint barrier n到达前, 已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算. 1 2 3 会被再次被计算, 所以123出现了重复计算.

7.9.3 Savepoint原理

  1. Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
  2. 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
  3. Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作
  4. 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等

7.9.4 checkpoint和savepoint的区别

SavepointCheckpoint
Savepoint是由命令触发, 由用户创建和删除Checkpoint被保存在用户指定的外部路径中
保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。当作业失败或被取消时,将保留外部存储的检查点。
用户必须提供用于还原作业状态的保存点的路径。用户必须提供用于还原作业状态的检查点的路径。

7.9.5 Flink+Kafka 实现端到端严格一次

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?
  • 内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性

  • source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

  • sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

    内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

具体的两阶段提交步骤总结如下:

  1. 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
  2. jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanagerr
  3. sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  4. jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
  5. sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
  6. 外部kafka关闭事务,提交的数据可以正常消费了

7.9.6 检查点的基本使用代码

public class BaseDBApp {

    public static void main(String[] args) {

       // TODO 1. 基本环境准别,获取流执行环境、设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);

       // TODO 2. 设置检查点
        // 2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置检查点失效时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        // 2.3 设置job任务取消时,保留检查点
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 2.4 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/ck"));
        // 2.5 设置检查点的重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
        // 2.6 设置操作hdfs的用户
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        
    }
    
}  

7.9.7 在代码中测试Checkpoint

package com.atguigu.flink.java.chapter_7.state;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class Flink04_State_Checkpoint {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "atguigu");

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.setProperty("group.id", "Flink01_Source_Kafka");
        properties.setProperty("auto.offset.reset", "latest");

        StreamExecutionEnvironment env = StreamExecutionEnvironment
          .createLocalEnvironmentWithWebUI(new Configuration())
          .setParallelism(3);
        env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));
        // 每 1000ms 开始一次 checkpoint
        env.enableCheckpointing(1000);
        // 高级选项:
        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 开启在 job 中止后仍然保留的 externalized checkpoints
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
        env
          .addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties))
          .map(value -> {
              String[] datas = value.split(",");
              return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

          })
          .keyBy(WaterSensor::getId)
          .process(new KeyedProcessFunction<String, WaterSensor, String>() {
              private ValueState<Integer> state;

              @Override
              public void open(Configuration parameters) throws Exception {
                  state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class));

              }

              @Override
              public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                  Integer lastVc = state.value() == null ? 0 : state.value();
                  if (Math.abs(value.getVc() - lastVc) >= 10) {
                      out.collect(value.getId() + " 红色警报!!!");
                  }
                  state.update(value.getVc());
              }
          })
          .addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "alert", new SimpleStringSchema()));

        env.execute();
    }
}

从SavePoint和CK恢复任务

  1. //启动任务
    bin/flink -c com.atguigu.WordCount xxx.jar

  2. //保存点(只能手动)
    bin/flink savepoint -m hadoop102:8081 JobId hdfs://hadoop102:8020/flink/save

  3. //关闭任务并从保存点恢复任务
    bin/flink -s hdfs://hadoop102:8020/flink/save/... -m hadoop102:8081 -c com.atguigu.WordCount xxx.jar

  4. //从CK位置恢复数据
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    bin/flink run -s hdfs://hadoop102:8020/flink/ck/Jobid/chk-960 -m hadoop102:8081 -c com.atguigu.WordCount xxx.jar

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 16:15:11  更:2021-07-15 16:17:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 3:43:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码