目录
1.分类
2.flink中的TIMESTAMP 和TIMESTAMP_LTZ
?3.event? time?Attribute 指定的两种方式
3.1创建table DDL 的时候指定event time attribute 在创建DDL的时候用?
3.2 DataStream-to-table的时候定义
3.3不用DataStream中的分配好的时间个水位线
4.processing time
4.1 ddl指定
4.2 datastream-to-table 指定
1.分类
time Attributes主要分为:processing time? 和event? time这个概念相比大家并不陌生。
2.flink中的TIMESTAMP 和TIMESTAMP_LTZ
先来聊聊mysql中的timetamp和datetime区别:
- 相同点:两者都可用来表示?YYYY-MM-DD HH:MM:SS?类型的日期。
- 不同点:对于timestamp:它把客户端插入的时间从?当前时区?转化为UTC(世界标准时间)进行存储。查询时,将其又转化为客户端当前时区进行返回。对于:不做任何改变,基本上是原样输入和输出。
再来聊聊flink中的TIMESTAMP 和TIMESTAMP_LTZ:
- flink中的TIMESTAMP行为和mysql中的datetime类型,用于不携带时区
- flink中的TIMESTAMP_LTZ行为和mysql中的timestamp类型,用于携带时区
- 二者都要求字段必须是时间类型才能自动转化为flink的TIMESTAMP 和TIMESTAMP_LTZ,但是实际开发中并非如此,我们可能会看到用字符串表示的时间:'2022-09-30 18:34:33'或者是用long类型表示的时间:1664532356103。? 这种时候直接用是一定会抛出异常的,因为flink无法将非时间字段转化为flink内部的:TIMESTAMP 和TIMESTAMP_LTZ,在这种情况下我们可以用TO_TIMESTAMP 或者是TO_TIMSTAMP_LTZ来进行映射转化。下一小节就会有对应的例子。
- 二者都接受一个小于9的参数: TIMSTAMP_LTZ(num),TO_TIMESTAMP(num)? 指的是时间的精确度,开发中经常用到TIMESTAMP(3)或者是TIMSTAMP_LTZ(3)意思是保留三位,实际含义就是精确到毫秒,TIMESTAMP(6)和TIMSTAMP_LTZ(6)表示微秒,TIMESTAMP(9)和TIMSTAMP_LTZ(9)表示纳秒。? 一般用TIMESTAMP(3)或者是TIMSTAMP_LTZ(3)就够了。
- 经验:如果流数据中的时间字段不携带时区信息则用TIMESTAMP(3),否则的话用TIMESTAMP_LTZ(3),? 如果流数据中时间字段类型是String:'2022-09-30 18:34:33'用:TIMESTAMP(3), 如果如果流数据中时间字段类型是long:1664532356103,? 则强烈建议:TIMESTAMP_LTZ(3), 因为这可以将时间戳转化为本地时间。
?3.event? time?Attribute 指定的两种方式
3.1创建table DDL 的时候指定event time attribute 在创建DDL的时候用?
WATERMARK statement(水位线语句) 定义, 水位线 语句将其使用的时间字段段标记为 event time attribute,这是一个自动标记的过程。WATERMARK statement使用到的时间字段类型必须是TIMESTAMP 或者是 TIMESTAMP_LTZ?
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),//user_action_time 本身是时间类型,
//所以可以直接映射为flink的TIMESTAMP
-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TIMESTAMP(3),//ts 在数据流中不是时间类型,而是long数字类型
//因此必须转换为flink内置的时间类型
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
3.2 DataStream-to-table的时候定义
默认来说DataStream-to-table的时候不会传播DataStream中已经定义好的事件时间和水位线,因此我们必须显示的用Schema来定义水位线的传播。
前提条件:首先先在DataStream中定义好事件时间和水位线。
Flink always derives rowtime attribute as TIMESTAMP WITHOUT TIME ZONE, because DataStream doesn’t have time zone notion, and treats all event time values as in UTC.
通过rowtime 提取Datastream 已有的事件时间属性,因为DataStream中已经存在了,直接拿出来用即可。
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// declare an additional logical field as an event time attribute
//第一种写法
Table table = tEnv.fromDataStream(
stream,
$("user_name"), $("data"),
$("user_action_time").rowtime()
);
下面是rowtime()方法的源码,请注意看其注释部分:
/**
* Declares a field as the rowtime attribute for indicating,
* accessing, and working in Flink's event time.
*/public OutType rowtime() {
return toApiSpecificExpression(unresolvedCall(ROWTIME, toExpr()));
}
第二种写法:
Schema schema = Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
Table table = tEnv.fromDataStream(stream,schema )
3.3不用DataStream中的分配好的时间个水位线
如果DataStream中没有事件时间和水位线,我们可以在table中重新定义写法如下:
Schema.newBuilder()
.columnByExpression("myrowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("myrowtime", "myrowtime- INTERVAL '10' SECOND")
.build());
myrowtime是随便定义的一个字段用来保存时间,
CAST是因为数据中的事件是string必须转成flink支持的
TIMESTAMP,如果你的数据中的时间本身是时间类型
则可以直接: .columnByExpression("rowtime", "TIMESTAMP_LTZ(3)")
当 .watermark("myrowtime", "rowtime - INTERVAL '10' SECOND")运行的时候
意味着myrowtime字段被当成了record time.
4.processing time
processing? time使用的时候不需要水位线
4.1 ddl指定
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
) WITH (
...
);
4.2 datastream-to-table 指定
DataStream<Tuple2<String, String>> stream = ...;
// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
//第二种写法
Schema schema = Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build();
Table table = tEnv.fromDataStream(stream,schema );
|