IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Table API 和 SQL -> 正文阅读

[大数据]Table API 和 SQL

  • 在企业实际应用中, 往往会面对大量类似的处理逻辑,所以一般会将底层 API?包装?更加具体的应用级接口。 作为大数据工程师,我们最?为熟悉的数据统计方式, 当然就是写 SQL?SQL?是结构化查询语言,是我们对关系型数据库进行查询和改的编程语言。无论是MySQL PostgreSQL,还是大数据应用中的 Hive,都少不了 SQL 的身影;而 Spark?作为大数?据处理引擎,为了更好地支持在 Hive?SQL 查询,也提供了Spark SQL。Flink?同样提供了对于表”处理的支持,这就是更高层级的应用 API,在 Flink 中被称为?Table?API SQL

    Flink?中这两种 API 被集成在一起Flink?是批流统一的处理框架,无论是批处理(DataSet?API) 还是流处理(DataStream?API),在上层应用中都可以直接使用 Table?API?或者 SQL?来实现;这两种 API?对于一张表执行相同的查询操作,得到的结果是完全一样?的。注意Table API 和 SQL 最初并不完善, 在 Flink 1.9 版本合并阿里巴巴内部版本?Blink??之后发生了非常大的改变, 此后也一直处在快速开发和完善的过程中, 直到 Flink ?1.12?本才基本上做到了功能上的完善。但然不算稳定,接口用法还在不停调整和更新

    1)程序架构:

    ?在?Flink 中, Table API SQL 可以看作联结在一起的一套 API,这套 API 的核心概念就?是“表”(Table)。在程序中输入数据可以定义成一张表;然后对这张表进行查询, 就可以得到新的表, 相当于流数据的转换操作;程序整体处理流程与 DataStream API 非常相似, 也可以分为读取数据?(Source)、转换(Transform)、输出数据(Sink) 三部分;只不过这里的输入输出操作不需?要额外定义,只需将用于输入和输出的表定义出来,进行转换查询就可以了。

    // 创建表环境
    TableEnvironment tableEnv = ...;
    // 创建输入表,连接外部系统读取数据
    tableEnv.executeSql("CREATE TEMPORARY TABLE sourceTable ... WITH ( 'connector' = ... )");
    // 注册一个表,连接到外部系统,用于输出
    tableEnv.executeSql("CREATE TEMPORARY TABLE sinkTable ... WITH ( 'connector' = ... )");
    // 执行 SQL 对表进行查询转换,得到一个新的表
    Table table1 = tableEnv.sqlQuery("SELECT ... FROM sourceTable... ");
    // 使用 Table API 对表进行查询转换,得到一个新的表
    Table table2 = tableEnv.from("sourceTable").select(...);
    // 将得到的结果写入输出表
    TableResult tableResult = table1.executeInsert("sinkTable");

    2)创建表环境:
    对于?Flink 来说,数据流和表在结构上还是有所区别的。所以使用 Table?API?SQL?需要一个特别的运行时环境, 这就是所谓的“表环境”(TableEnvironment)。它主?负责:
    (1)?注册 Catalog?和表;
    (2) 执行 SQL?查询;
    (3) 注册用户自定义函数(UDF);
    (4) DataStream?和表之间的转换。
    这里的 Catalog?就是“目录”,与标准 SQL 中的概念是一致的, 主要用来管理所有数据库? (database)和表(table) 的元数据。通过 Catalog?可以方便地对数据库和表进查询的理。?在表环境中可以由用户自定义 Catalog,并在其中注册表和自定义函数(UDF)。默认的 Catalog?就叫作 default_catalog每个表和 SQL 的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment?Table?API?中提供的基本接口类, 可以通过调用静态的 create()方法来创建一个表环境实例。?可以指定当前表环境的执行模式和?划器(planner)。执行模式有批处理和流处理两种选择, 默认是流处理模式;计划器默认使blink?planner

    3)创建表:
    Flink 中的表概念并不特殊,是由多个数据构成的,每个行(Row)?可以有定义好的多个列(Column);为了方便查询, 表环境中会维护一个目录(Catalog) 和表的对应关系。所以表都是 通过 Catalog?进行注册创建的。表在环境中有一个唯一的 ID,由三部分组成:目录(catalog)名,?数据库(database) 名,以及表名。默认目录名为 default_catalog,数据库名为?default_database。所以如直接创建一个叫作 MyTable 的表,它的 ID 就是:default_catalog.default_database.MyTable 具体创建表的方式, 有通过连接器(connector)和虚拟表(virtual?tables) 两种。
    (1)连接器表(Connector?Tables)
    最直观的创建表的方式, 就是通过连接器(connector
    ) 连接到一个外部系统,然后定义出对应的结构。当我们在表中读取这张表,连接器就会从外部系统读取数据并进行转换;而当我们向这张表写入数据,??连接器就会将数据输出(Sink)到外部系统中。在代码中, 可以调用表环executeSql()方法,传入一个 DDL ?作为参数执行?SQL 操作。这里传入一个 CREATE 语句进行表的创建,并通过 WITH?关键字指定连接到?外部统的连接器:

    tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");

    (2)虚拟表:
    在环境中注册后, 就可以在 SQL 中直接使用这张表进行查询转换了。
    Table
    ?newTable?= tableEnv.sqlQuery("SELECT ... FROM MyTable... ");?这里调用表环境的 sqlQuery()方法, 直接传入一条SQL 语句作为参数执行查询,得到的结果是Table 对象。得到的 newTable 是一个中间转换结果, 如果之后又希望直接使用这个表执行 SQL,又该?怎么做呢? 由于 newTable 是一个 Table?对象,并没有在表环境中注册; 所以还需要将这?间结果表注册到环境中,才能使用SQL。tableEnv.createTemporaryView("NewTable", newTable);??这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 法中的视图(View)非常类似,所以也叫作创建“虚拟视图”(createTemporaryView)。?

    4)综合应用示例
    现在将?API 整合起来, 写出一段完整的代码。统计用户的?一组点击事件, 例如可以查询出某个用户点击的 url 列表, 也可以统计出每个用户累计的点次数,用两句 SQL 来分别实现。具体代码如下:

    public static void main(String[] args) throws Exception {
            // 获取流环境
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // 读取数据源
            SingleOutputStreamOperator<Event> eventStream = env
                    .fromElements(
                            new Event("zhangsan", "./home", 1000L),
                            new Event("lisi", "./cart", 1000L), 
                            new Event("zhangsan", "./prod?id=1", 5 * 1000L), 
                            new Event("lisi", "./home", 60 * 1000L), 
                            new Event("wangwu", "./prod?id=3", 90 * 1000L), 
                            new Event("zhangsan", "./prod?id=7", 105 * 1000L)
                    );
            // 获取表环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            // 将数据流转换成表
            tableEnv.createTemporaryView("eventTable", eventStream);
    
            // 查询 zhangsan 访问的 url 列表
            Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM eventTable WHERE user = 'zhangsan'");
            // 统计每个用户的点击次数
            Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) FROM eventTable GROUP BY user");
            // 将表转换成数据流,在控制台打印输出
            tableEnv.toDataStream(aliceVisitTable).print("zhangsan visit");
            tableEnv.toChangelogStream(urlCountTable).print("count");
            // 执行程序
            env.execute();
        }

  • 流处理中的表:
    1)动态表和持续查询:
    流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库?中的表完全同;而基于表查询的操作也就有了新的含义。如果我们希望把流数据转换成表的形式,那么这表中的数据就会不断增长;如果执行 SQL?查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。

    (1)动态表(Dynamic?Tables)
    当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的 SQL 查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)动态表是Flink?Table API SQL 中的核心概念,它为流数据处理提供了表和 SQL 支持。?我们熟悉的表一般用来做批处理, 面向的是固定的数据集, 可以认为是“静态表”;而动态表则完全不同, 它里面的数据会随时间变化动态表的概念, 在传统的关系型数据库中数据库中的表, 其实是一系列 INSERT UPDATE DELETE?语句执行的结果; 在关系型数据库中,我们一般把它?更新日志流(changelog?stream)。如果我们保存了表在某一时刻的快照(snapshot),那么?接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型?库(比如 Oracle?DB2)中都有“物化视图”(Materialized?Views)的概念,可以用来缓?SQL?查询的结果; 它的更新其实就是不停地处理更新日志流的过程。Flink?中的动态表, 就借鉴了物化视图的思想。
    (2)持续查询:

    动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义?SQL?查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作, 都是持续查询;而持续查询的结果也会是一个动态表。由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输?入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来, 就构成了“持续查询”。如图所示:

    ?持续查询的步骤如下:
    (1) 流(stream) 被转换为动态表(dynamic?table);
    (2)?对动态表进行持续查询(continuous?query),生成新的动态表;
    (3) 生成的动态表被转换成流。
    这样,只要 API?将流和动态表的转换封装起来,我们就可以直接在数据流上执行 SQL ?询,用处理表的方式来做流处理。

    2)?SQL 持续查询:
    (1)更新查询:

    Table eventCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

    当原始动态表不停地插入新的数据时, 查询得到的 table?会持续行更改。由于 count 数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),?也可以是对之前数据更新(Update)。换句话说用来定义结果表的更新日志(changelog)?流中,包含?INSERT UPDATE 两种操作。这种持续查询被称为更新查询(Update Query),?更新查询得到的结果表如果想要转换成 DataStream,必须调用 toChangelogStream()方法。
    具体步骤解释如下:
    (1) 当查询启动时,原始动态表 EventTable 为空;
    (2)当第一行 zhangsan?的点击数据插入 EventTable 表时,查询开始计算结果表,eventCountTable中插入一行数据[zhangsan,1]。
    (3) 当第二行 list?击数据插入 EventTable 表时,查询将更新结果表并插入新行[lisi1]
    (4)
    ?第三行数据到来, 同样是 zhangsan?的点击事件, 这时不会插入新行, 而是生成一个针对已有行的更新操作。这样将结果表中第一行[zhangsan1]就更新为[zhangsan?2]
    (5) 当第四行 wangwu
    ?的点击数据插入到 EventTable?表时, 查询将第三行[wangwu?1]插入到结果表中。
    (2)追加(
    Append)查询:
    更新查询过程用到了分组聚合,结果表中就会产生更新操作。如果执行一?个简单的条件查询, 结果表中就会像原始表?EventTable 一样,只有插入(Insert)操作了。

    Table resultTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");

    样的持续查询,就被称为追加查询(Append??Query), 它定义的结果表的更新日志? (changelog) 中只有 INSERT 操作。追加查询得到的结果表, 转换成 DataStream 调用方法没有限制, 可以直接用 toDataStream(),也可以像更新查询一样调用toChangelogStream()什么时候聚合的结果会保持不变呢? 一个典型的例子就是窗口聚合。我们考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在结果表中增加一个?endT?段,表示当前统计窗口的结束时间。代码如下:

    public static void main(String[] args) throws Exception {
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		env.setParallelism(1);
    		// 读取数据源,并分配时间戳、生成水位线
    		SingleOutputStreamOperator<Event> eventStream = env
    				.fromElements(
    						new Event("zhangsan", "./home", 1000L),
    						new Event("lisi", "./cart", 1000L),
    						new Event("zhangsan", "./prod?id=1", 25 * 60 * 1000L),
    						new Event("zhangsan", "./prod?id=4", 55 * 60 * 1000L),
    						new Event("lisi", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
    						new Event("wangwu", "./home", 3600 * 1000L + 30 * 60 * 1000L),
    						new Event("wangwu", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
    				)
    				.assignTimestampsAndWatermarks(
    						WatermarkStrategy.<Event>forMonotonousTimestamps()
    								.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.timestamp)
    				);
    		// 创建表环境
    		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    		// 将数据流转换成表,并指定时间属性
    		Table eventTable = tableEnv.fromDataStream(
    				eventStream,
    				$("user"),
    				$("url"),
    				$("timestamp").rowtime().as("ts")
    		// 将 timestamp 指定为事件时间,并命名为 ts
    		);
    		// 为方便在 SQL 中引用,在环境中注册表 EventTable
    		tableEnv.createTemporaryView("EventTable", eventTable);
    		// 设置 1 小时滚动窗口,执行 SQL 统计查询
    		Table result = tableEnv
    				.sqlQuery(
    						"SELECT " +
    								"user, " + "window_end AS "COUNT(url)AS
    						"FROM TABLE( " + "TUMBLE( TABLE
    						endT, " +   // 窗口结束时间
    						cnt" +   // 统计 url 访问次数
    						EventTable, " +   // 1 小时滚动窗口
    						"DESCRIPTOR(ts), " +
    								"INTERVAL '1' HOUR)) " +
    								"GROUP BY user, window_start, window_end "
    				);
    		tableEnv.toDataStream(result).print();
    		env.execute();
    	}

    由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入 INSERT 操作,而没有更新 UPDATE 操作。所以这里的持续查询,依然是一个?追加(Append)查询。结果表 result?如果转换成 DataStream,可以直接调用 toDataStream()

  • 时间属性和窗口:
    1)件时间:
    在实际应用中, 最常用的就是事件时间。在事件时间语义下, 允许表处理程序根据每
    ?个数据中包的时间戳(也就是事件发生的时间) 来生成结果。事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。通过设置水位线?(watermark)来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下, 对数据的处理也可以获得正确的结果。事件时间属性可以在创建表 DDL?中定义,也可以在数据流和表的转换中定义。

    2)窗口:

    从 1.13 ?版本开始,Flink??开始使用窗口表值函数(Windowing??table-valued??functions??Windowing??TVFs)来定义窗口。窗口表值函数是 Flink?定义的多态表函数(PTF),可以将进行扩展后回。目前?Flink 提供了以下几个窗口
    (1)滚动窗口(Tumbling Windows);
    (2)滑动窗口(Hop?Windows,跳跃窗口)
    (3)累积窗口(Cumulate?Windows);
    (4)会话窗口(Session?Windows)。
    窗口表值函数
    可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window?join) 等等。?????
    在窗口?TVF 的返回值中, 除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:?“窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。?点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于?window_end - 1ms ,相当于窗口中能够包含数据的最大时间戳。在 SQL??中的声明方式直接调TUMBLE() HOP() CUMULATE()就可以实现滚动、滑动和累积窗口

    (1) 滚动窗口(TUMBLE):
    滚动窗口在 SQL 中的概念与
    DataStream API 中的定义完全一样,是长度固定、时间对齐、?无重叠的窗口, 一般用于周期性的统计计算。在 SQL?中通过调用 TUMBLE()函数就可以声明一个滚动窗口, 只有一个核心参数就是窗口大小(size)。?窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)??这里基于间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将?表中的每一行数据, 按照它们 ts?的值分配到一个指定的窗口中。

    (2
    ) 滑动窗口(HOP):

    滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL????通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide) 两个参数。HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟?滑动一次。

    (3) 累积窗口(CUMULATE):
    例如按天来统计网站的 PV ?(Page View,页面浏览量),如果用 1 天的滚动窗口,?那需要到每天 24 才会计算一次,输出频率太低; 如果用滑动窗口,计算频率可以更高,但统计的就变成了“24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的?PV,不过需要每隔 1?小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累 积窗口”(Cumulate Window)。具体声明如下:CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
    ?

  • 聚合(Aggregation)查询:
    1)分组聚合:
    从概念上讲,
    SQL?中的分组聚?合可以对应 DataStream?API?keyBy?之后的聚合转换,它们都是按照某个 key?对数据进行了?划分?各自维护状态来进行聚合统计的。在流处理中, 分组聚合同样是一个持续查询, 而且是?个更新查询, 得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。在持续查询的过程中, 由于用于分组的 key 可能会不断增加,因此计算结果所需要?维护的状态也会持续长。为了防止状态无限增长耗尽资源,Flink Table API SQL 可以在表?中配置状态的生存时间(TTL)。

    2)窗口聚合:

    在流处理中,往往需要将无限数据流划分成有界数据集, 这就是所谓的窗口在 Flink Table API SQL 中, 窗口的计算是通过“窗口聚合”(window aggregation)?实现的。与分组聚合类似, 窗口聚合也需要调用 SUM()MAX() MIN()COUNT()一类的?聚合函数?通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作?为分组 key?的一部分定义出来。在 Flink?1.12 版本之前,是直接把窗口自身作为分组 key 放在?GROUP?BY?之后的, 所以也叫“分组窗口聚合”。1.13 版本开始使用了?“窗口表值函”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在 FROM?后面GROUP BY 后面的则是窗口新增的字段window_start window_end??下面是一段窗口聚合的完整代码, 以累积窗口为例:

    public static void main(String[] args) throws Exception {
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		env.setParallelism(1);
    		// 读取数据源,并分配时间戳、生成水位线
    		SingleOutputStreamOperator<Event> eventStream = env
    				.fromElements(
    						new Event("zhangsan", "./home", 1000L),
    						new Event("lisi", "./cart", 1000L),
    						new Event("zhangsan", "./prod?id=1", 25 * 60 * 1000L),
    						new Event("zhangsan", "./prod?id=4", 55 * 60 * 1000L),
    						new Event("lisi", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
    						new Event("wangwu", "./home", 3600 * 1000L + 30 * 60 * 1000L),
    						new Event("wangwu", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
    				)
    				.assignTimestampsAndWatermarks(
    						WatermarkStrategy.<Event>forMonotonousTimestamps()
    								.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.timestamp)
    				);
    		// 创建表环境
    		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    		// 将数据流转换成表,并指定时间属性
    		Table eventTable = tableEnv.fromDataStream(
    				eventStream,
    				$("user"),
    				$("url"),
    				$("timestamp").rowtime().as("ts")
    		);
    		// 为方便在 SQL 中引用,在环境中注册表 EventTable
    		tableEnv.createTemporaryView("EventTable", eventTable);
    		// 设置累积窗口,执行 SQL 统计查询
    		Table result = tableEnv
    				.sqlQuery(
    						"SELECT " +
    								"user, " +
    								"window_end AS endT, " +
    								"COUNT(url) AS cnt " +
    								"FROM TABLE( " +
    								"CUMULATE( TABLE EventTable, " +   // 定义累积窗口
    								"DESCRIPTOR(ts), " +
    								"INTERVAL '30' MINUTE, " +
    								"INTERVAL '1' HOUR)) " +
    								"GROUP BY user, window_start, window_end "
    				);
    		tableEnv.toDataStream(result).print();
    		env.execute();
    	}

    应用案例TopN:
    统计一段时间内的热门商品。这就需要先窗口,在窗口中统计每个商品的点击量;然后将统计数据收集起来, 按窗口进行分组,并按?点击量大小降序排序,选取前 N 个作为结果返回。

    package com.demo.main;
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import static org.apache.flink.table.api.Expressions.$;
    
    public class WindowTopNExample {
    	public static void main(String[] args) throws Exception {
    		StreamExecutionEnvironment env =
    				StreamExecutionEnvironment.getExecutionEnvironment();
    		env.setParallelism(1);
    		// 读取数据源,并分配时间戳、生成水位线
    		SingleOutputStreamOperator<Event> eventStream = env
    				.fromElements(
    						new Event("zhangsan", "./home", 1000L),
    						new Event("lisi", "./cart", 1000L),
    						new Event("zhangsan", "./prod?id=1", 25 * 60 * 1000L),
    						new Event("zhangsan", "./prod?id=4", 55 * 60 * 1000L),
    						new Event("lisi", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
    						new Event("wangwu", "./home", 3600 * 1000L + 30 * 60 * 1000L),
    						new Event("wangwu", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
    				)
    				.assignTimestampsAndWatermarks(
    						WatermarkStrategy.<Event>forMonotonousTimestamps()
    								.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.timestamp)
    				);
    		// 创建表环境
    		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    		// 将数据流转换成表,并指定时间属性
    		Table eventTable = tableEnv.fromDataStream(
    				eventStream,
    				$("user"),
    				$("url"),
    				$("timestamp").rowtime().as("ts")
     		// 将 timestamp 指定为事件时间,并命名为 ts
    		);
    		// 在环境中注册表 EventTable
    		tableEnv.createTemporaryView("EventTable", eventTable);
    		// 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表 
    		String subQuery = "SELECT window_start, window_end, user, COUNT(url) as cnt " + "FROM TABLE ( " +
    				"TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) GROUP BY window_start, window_end, user ";
    
    		// 定义 Top N 的外层查询
    		String topNQuery =
    				"SELECT * " +
    						"FROM (" +
    						"SELECT *, " +
    						"ROW_NUMBER() OVER ( " +
    						"PARTITION BY window_start, window_end " +
    						"ORDER BY cnt desc " +
    						") AS row_num " +
    						"FROM (" + subQuery + ")) " +
    						"WHERE row_num <= 2";
    
    		Table result = tableEnv.sqlQuery(topNQuery);
    		tableEnv.toDataStream(result).print();
    
    		env.execute();
    	}
    }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-08 20:48:45  更:2022-10-08 20:50:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年5日历 -2025/5/1 13:46:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码