-
在企业实际应用中, 往往会面对大量类似的处理逻辑,所以一般会将底层 API?包装?成更加具体的应用级接口。 作为大数据工程师,我们最?为熟悉的数据统计方式, 当然就是写 SQL?。SQL?是结构化查询语言,是我们对关系型数据库进行查询和修改的编程语言。无论是MySQL 、PostgreSQL,还是大数据应用中的 Hive,都少不了 SQL 的身影;而 Spark?作为大数?据处理引擎,为了更好地支持在 Hive?中的 SQL 查询,也提供了Spark SQL。Flink?同样提供了对于“表”处理的支持,这就是更高层级的应用 API,在 Flink 中被称为?Table?API 和 SQL。
 在 Flink?中这两种 API 被集成在一起。Flink?是批流统一的处理框架,无论是批处理(DataSet?API) 还是流处理(DataStream?API),在上层应用中都可以直接使用 Table?API?或者 SQL?来实现;这两种 API?对于一张表执行相同的查询操作,得到的结果是完全一样?的。注意Table API 和 SQL 最初并不完善, 在 Flink 1.9 版本合并阿里巴巴内部版本?Blink??之后发生了非常大的改变, 此后也一直处在快速开发和完善的过程中, 直到 Flink ?1.12?版本才基本上做到了功能上的完善。但依然不算稳定,接口用法还在不停调整和更新。
1)程序架构: ?在?Flink 中, Table API 和 SQL 可以看作联结在一起的一套 API,这套 API 的核心概念就?是“表”(Table)。在程序中输入数据可以定义成一张表;然后对这张表进行查询, 就可以得到新的表, 相当于流数据的转换操作;程序的整体处理流程与 DataStream API 非常相似, 也可以分为读取数据?源(Source)、转换(Transform)、输出数据(Sink) 三部分;只不过这里的输入输出操作不需?要额外定义,只需将用于输入和输出的表定义出来,进行转换查询就可以了。 // 创建表环境
TableEnvironment tableEnv = ...;
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE sourceTable ... WITH ( 'connector' = ... )");
// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE sinkTable ... WITH ( 'connector' = ... )");
// 执行 SQL 对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM sourceTable... ");
// 使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("sourceTable").select(...);
// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("sinkTable"); 2)创建表环境: 对于?Flink 来说,数据流和表在结构上还是有所区别的。所以使用 Table?API?和 SQL?需要一个特别的运行时环境, 这就是所谓的“表环境”(TableEnvironment)。它主?要负责: (1)?注册 Catalog?和表; (2) 执行 SQL?查询; (3) 注册用户自定义函数(UDF); (4) DataStream?和表之间的转换。 这里的 Catalog?就是“目录”,与标准 SQL 中的概念是一致的, 主要用来管理所有数据库? (database)和表(table) 的元数据。通过 Catalog?可以方便地对数据库和表进行查询的管理。?在表环境中可以由用户自定义 Catalog,并在其中注册表和自定义函数(UDF)。默认的 Catalog?就叫作 default_catalog。每个表和 SQL 的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment?是 Table?API?中提供的基本接口类, 可以通过调用静态的 create()方法来创建一个表环境实例。?可以指定当前表环境的执行模式和?计划器(planner)。执行模式有批处理和流处理两种选择, 默认是流处理模式;计划器默认使用 blink?planner。
3)创建表: Flink 中的表概念并不特殊,是由多个“行”数据构成的,每个行(Row)?又可以有定义好的多个列(Column);为了方便查询, 表环境中会维护一个目录(Catalog) 和表的对应关系。所以表都是 通过 Catalog?来进行注册创建的。表在环境中有一个唯一的 ID,由三部分组成:目录(catalog)名,?数据库(database) 名,以及表名。默认目录名为 default_catalog,数据库名为?default_database。所以如果直接创建一个叫作 MyTable 的表,它的 ID 就是:default_catalog.default_database.MyTable 具体创建表的方式, 有通过连接器(connector)和虚拟表(virtual?tables) 两种。 (1)连接器表(Connector?Tables) 最直观的创建表的方式, 就是通过连接器(connector) 连接到一个外部系统,然后定义出对应的表结构。当我们在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换;而当我们向这张表写入数据,??连接器就会将数据输出(Sink)到外部系统中。在代码中, 可以调用表环境的 executeSql()方法,传入一个 DDL ?作为参数执行?SQL 操作。这里传入一个 CREATE 语句进行表的创建,并通过 WITH?关键字指定连接到?外部系统的连接器: tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )"); (2)虚拟表: 在环境中注册之后, 就可以在 SQL 中直接使用这张表进行查询转换了。 Table?newTable?= tableEnv.sqlQuery("SELECT ... FROM MyTable... ");?这里调用表环境的 sqlQuery()方法, 直接传入一条SQL 语句作为参数执行查询,得到的结果是一个 Table 对象。得到的 newTable 是一个中间转换结果, 如果之后又希望直接使用这个表执行 SQL,又该?怎么做呢? 由于 newTable 是一个 Table?对象,并没有在表环境中注册; 所以还需要将这?个中间结果表注册到环境中,才能使用SQL。tableEnv.createTemporaryView("NewTable", newTable);??这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 语法中的视图(View)非常类似,所以也叫作创建“虚拟视图”(createTemporaryView)。?
4)综合应用示例: 现在将?API 整合起来, 写出一段完整的代码。统计用户的?一组点击事件, 例如可以查询出某个用户点击的 url 列表, 也可以统计出每个用户累计的点击次数,用两句 SQL 来分别实现。具体代码如下: public static void main(String[] args) throws Exception {
// 获取流环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
new Event("zhangsan", "./home", 1000L),
new Event("lisi", "./cart", 1000L),
new Event("zhangsan", "./prod?id=1", 5 * 1000L),
new Event("lisi", "./home", 60 * 1000L),
new Event("wangwu", "./prod?id=3", 90 * 1000L),
new Event("zhangsan", "./prod?id=7", 105 * 1000L)
);
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表
tableEnv.createTemporaryView("eventTable", eventStream);
// 查询 zhangsan 访问的 url 列表
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM eventTable WHERE user = 'zhangsan'");
// 统计每个用户的点击次数
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) FROM eventTable GROUP BY user");
// 将表转换成数据流,在控制台打印输出
tableEnv.toDataStream(aliceVisitTable).print("zhangsan visit");
tableEnv.toChangelogStream(urlCountTable).print("count");
// 执行程序
env.execute();
} -
流处理中的表: 1)动态表和持续查询: 流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库?中的表完全不同;而基于表查询的操作也就有了新的含义。如果我们希望把流数据转换成表的形式,那么这表中的数据就会不断增长;如果执行 SQL?查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。
(1)动态表(Dynamic?Tables) 当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的 SQL 查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。动态表是Flink?在Table API 和 SQL 中的核心概念,它为流数据处理提供了表和 SQL 支持。?我们熟悉的表一般用来做批处理, 面向的是固定的数据集, 可以认为是“静态表”;而动态表则完全不同, 它里面的数据会随时间变化。动态表的概念, 在传统的关系型数据库中数据库中的表, 其实是一系列 INSERT 、UPDATE 和 DELETE?语句执行的结果; 在关系型数据库中,我们一般把它?称为更新日志流(changelog?stream)。如果我们保存了表在某一时刻的快照(snapshot),那么?接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型?数据库(比如 Oracle?、DB2)中都有“物化视图”(Materialized?Views)的概念,可以用来缓?存 SQL?查询的结果; 它的更新其实就是不停地处理更新日志流的过程。Flink?中的动态表, 就借鉴了物化视图的思想。 (2)持续查询: 动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义?的 SQL?查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作, 都是持续查询;而持续查询的结果也会是一个动态表。由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输?入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来, 就构成了“持续查询”。如图所示:
 ?持续查询的步骤如下: (1) 流(stream) 被转换为动态表(dynamic?table); (2)?对动态表进行持续查询(continuous?query),生成新的动态表; (3) 生成的动态表被转换成流。 这样,只要 API?将流和动态表的转换封装起来,我们就可以直接在数据流上执行 SQL 查?询,用处理表的方式来做流处理。
2)?用 SQL 持续查询: (1)更新查询: Table eventCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user"); 当原始动态表不停地插入新的数据时, 查询得到的 table?会持续地进行更改。由于 count 数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),?也可以是对之前数据的更新(Update)。换句话说用来定义结果表的更新日志(changelog)?流中,包含了?INSERT 和 UPDATE 两种操作。这种持续查询被称为更新查询(Update Query),?更新查询得到的结果表如果想要转换成 DataStream,必须调用 toChangelogStream()方法。 具体步骤解释如下: (1) 当查询启动时,原始动态表 EventTable 为空; (2)当第一行 zhangsan?的点击数据插入 EventTable 表时,查询开始计算结果表,eventCountTable中插入一行数据[zhangsan,1]。 (3) 当第二行 list?点击数据插入 EventTable 表时,查询将更新结果表并插入新行[lisi, 1]。 (4)?第三行数据到来, 同样是 zhangsan?的点击事件, 这时不会插入新行, 而是生成一个针对已有行的更新操作。这样将结果表中第一行[zhangsan,1]就更新为[zhangsan?,2]。 (5) 当第四行 wangwu?的点击数据插入到 EventTable?表时, 查询将第三行[wangwu?,1]插入到结果表中。 (2)追加(Append)查询: 更新查询过程用到了分组聚合,结果表中就会产生更新操作。如果执行一?个简单的条件查询, 结果表中就会像原始表?EventTable 一样,只有插入(Insert)操作了。 Table resultTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'"); 这样的持续查询,就被称为追加查询(Append??Query), 它定义的结果表的更新日志? (changelog) 流中只有 INSERT 操作。追加查询得到的结果表, 转换成 DataStream 调用方法没有限制, 可以直接用 toDataStream(),也可以像更新查询一样调用toChangelogStream()。什么时候聚合的结果会保持不变呢? 一个典型的例子就是窗口聚合。我们考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在结果表中增加一个?endT?字段,表示当前统计窗口的结束时间。代码如下: public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
new Event("zhangsan", "./home", 1000L),
new Event("lisi", "./cart", 1000L),
new Event("zhangsan", "./prod?id=1", 25 * 60 * 1000L),
new Event("zhangsan", "./prod?id=4", 55 * 60 * 1000L),
new Event("lisi", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
new Event("wangwu", "./home", 3600 * 1000L + 30 * 60 * 1000L),
new Event("wangwu", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.timestamp)
);
// 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表,并指定时间属性
Table eventTable = tableEnv.fromDataStream(
eventStream,
$("user"),
$("url"),
$("timestamp").rowtime().as("ts")
// 将 timestamp 指定为事件时间,并命名为 ts
);
// 为方便在 SQL 中引用,在环境中注册表 EventTable
tableEnv.createTemporaryView("EventTable", eventTable);
// 设置 1 小时滚动窗口,执行 SQL 统计查询
Table result = tableEnv
.sqlQuery(
"SELECT " +
"user, " + "window_end AS "COUNT(url)AS
"FROM TABLE( " + "TUMBLE( TABLE
endT, " + // 窗口结束时间
cnt" + // 统计 url 访问次数
EventTable, " + // 1 小时滚动窗口
"DESCRIPTOR(ts), " +
"INTERVAL '1' HOUR)) " +
"GROUP BY user, window_start, window_end "
);
tableEnv.toDataStream(result).print();
env.execute();
} 由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入 INSERT 操作,而没有更新 UPDATE 操作。所以这里的持续查询,依然是一个?追加(Append)查询。结果表 result?如果转换成 DataStream,可以直接调用 toDataStream()方法。 -
时间属性和窗口: 1)事件时间: 在实际应用中, 最常用的就是事件时间。在事件时间语义下, 允许表处理程序根据每?个数据中包含的时间戳(也就是事件发生的时间) 来生成结果。事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。通过设置水位线?(watermark)来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下, 对数据的处理也可以获得正确的结果。事件时间属性可以在创建表 DDL?中定义,也可以在数据流和表的转换中定义。
2)窗口: 从 1.13 ?版本开始,Flink??开始使用窗口表值函数(Windowing??table-valued??functions?,?Windowing??TVFs)来定义窗口。窗口表值函数是 Flink?定义的多态表函数(PTF),可以将表进行扩展后返回。目前?Flink 提供了以下几个窗口 : (1)滚动窗口(Tumbling Windows); (2)滑动窗口(Hop?Windows,跳跃窗口); (3)累积窗口(Cumulate?Windows); (4)会话窗口(Session?Windows)。 窗口表值函数可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window?join) 等等。?????在窗口?TVF 的返回值中, 除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:?“窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。?起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于?window_end - 1ms ,相当于窗口中能够包含数据的最大时间戳。在 SQL??中的声明方式直接调用 TUMBLE() 、HOP() 、 CUMULATE()就可以实现滚动、滑动和累积窗口。
(1) 滚动窗口(TUMBLE): 滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、?无重叠的窗口, 一般用于周期性的统计计算。在 SQL?中通过调用 TUMBLE()函数就可以声明一个滚动窗口, 只有一个核心参数就是窗口大小(size)。?窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)??这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将?表中的每一行数据, 按照它们 ts?的值分配到一个指定的窗口中。
(2) 滑动窗口(HOP): 滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL????中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide) 两个参数。HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟?滑动一次。
(3) 累积窗口(CUMULATE): 例如按天来统计网站的 PV ?(Page View,页面浏览量),如果用 1 天的滚动窗口,?那需要到每天 24 点才会计算一次,输出频率太低; 如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的?PV,不过需要每隔 1?小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累 积窗口”(Cumulate Window)。具体声明如下:CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS)) ?
-
聚合(Aggregation)查询: 1)分组聚合: 从概念上讲,SQL?中的分组聚?合可以对应 DataStream?API?中 keyBy?之后的聚合转换,它们都是按照某个 key?对数据进行了?划分,?各自维护状态来进行聚合统计的。在流处理中, 分组聚合同样是一个持续查询, 而且是?一个更新查询, 得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。在持续查询的过程中, 由于用于分组的 key 可能会不断增加,因此计算结果所需要?维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API 和 SQL 可以在表?环境中配置状态的生存时间(TTL)。
2)窗口聚合: 在流处理中,往往需要将无限数据流划分成有界数据集, 这就是所谓的“窗口”。在 Flink 的 Table API 和 SQL 中, 窗口的计算是通过“窗口聚合”(window aggregation)?来实现的。与分组聚合类似, 窗口聚合也需要调用 SUM()、MAX() 、MIN()、COUNT()一类的?聚合函数,?通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作?为分组 key?的一部分定义出来。在 Flink?1.12 版本之前,是直接把窗口自身作为分组 key 放在?GROUP?BY?之后的, 所以也叫“分组窗口聚合”。而 1.13 版本开始使用了?“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在 FROM?后面, GROUP BY 后面的则是窗口新增的字段window_start 和window_end??下面是一段窗口聚合的完整代码, 以累积窗口为例: public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
new Event("zhangsan", "./home", 1000L),
new Event("lisi", "./cart", 1000L),
new Event("zhangsan", "./prod?id=1", 25 * 60 * 1000L),
new Event("zhangsan", "./prod?id=4", 55 * 60 * 1000L),
new Event("lisi", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
new Event("wangwu", "./home", 3600 * 1000L + 30 * 60 * 1000L),
new Event("wangwu", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.timestamp)
);
// 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表,并指定时间属性
Table eventTable = tableEnv.fromDataStream(
eventStream,
$("user"),
$("url"),
$("timestamp").rowtime().as("ts")
);
// 为方便在 SQL 中引用,在环境中注册表 EventTable
tableEnv.createTemporaryView("EventTable", eventTable);
// 设置累积窗口,执行 SQL 统计查询
Table result = tableEnv
.sqlQuery(
"SELECT " +
"user, " +
"window_end AS endT, " +
"COUNT(url) AS cnt " +
"FROM TABLE( " +
"CUMULATE( TABLE EventTable, " + // 定义累积窗口
"DESCRIPTOR(ts), " +
"INTERVAL '30' MINUTE, " +
"INTERVAL '1' HOUR)) " +
"GROUP BY user, window_start, window_end "
);
tableEnv.toDataStream(result).print();
env.execute();
} 应用案例TopN: 统计一段时间内的热门商品。这就需要先开窗口,在窗口中统计每个商品的点击量;然后将统计数据收集起来, 按窗口进行分组,并按?点击量大小降序排序,选取前 N 个作为结果返回。 package com.demo.main;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class WindowTopNExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
new Event("zhangsan", "./home", 1000L),
new Event("lisi", "./cart", 1000L),
new Event("zhangsan", "./prod?id=1", 25 * 60 * 1000L),
new Event("zhangsan", "./prod?id=4", 55 * 60 * 1000L),
new Event("lisi", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
new Event("wangwu", "./home", 3600 * 1000L + 30 * 60 * 1000L),
new Event("wangwu", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.timestamp)
);
// 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表,并指定时间属性
Table eventTable = tableEnv.fromDataStream(
eventStream,
$("user"),
$("url"),
$("timestamp").rowtime().as("ts")
// 将 timestamp 指定为事件时间,并命名为 ts
);
// 在环境中注册表 EventTable
tableEnv.createTemporaryView("EventTable", eventTable);
// 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表
String subQuery = "SELECT window_start, window_end, user, COUNT(url) as cnt " + "FROM TABLE ( " +
"TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) GROUP BY window_start, window_end, user ";
// 定义 Top N 的外层查询
String topNQuery =
"SELECT * " +
"FROM (" +
"SELECT *, " +
"ROW_NUMBER() OVER ( " +
"PARTITION BY window_start, window_end " +
"ORDER BY cnt desc " +
") AS row_num " +
"FROM (" + subQuery + ")) " +
"WHERE row_num <= 2";
Table result = tableEnv.sqlQuery(topNQuery);
tableEnv.toDataStream(result).print();
env.execute();
}
}
|