IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 29.flink table api 之 time Attributes -> 正文阅读

[大数据]29.flink table api 之 time Attributes

目录

1.分类

2.flink中的TIMESTAMP 和TIMESTAMP_LTZ

?3.event? time?Attribute 指定的两种方式

3.1创建table DDL 的时候指定event time attribute 在创建DDL的时候用?

3.2 DataStream-to-table的时候定义

3.3不用DataStream中的分配好的时间个水位线

4.processing time

4.1 ddl指定

4.2 datastream-to-table 指定


1.分类

time Attributes主要分为:processing time? 和event? time这个概念相比大家并不陌生。

2.flink中的TIMESTAMP 和TIMESTAMP_LTZ

先来聊聊mysql中的timetamp和datetime区别:

  1. 相同点:两者都可用来表示?YYYY-MM-DD HH:MM:SS?类型的日期。
  2. 不同点:对于timestamp它把客户端插入的时间从?当前时区?转化为UTC(世界标准时间)进行存储。查询时,将其又转化为客户端当前时区进行返回。对于不做任何改变,基本上是原样输入和输出。

再来聊聊flink中的TIMESTAMP 和TIMESTAMP_LTZ:

  1. flink中的TIMESTAMP行为和mysql中的datetime类型,用于不携带时区
  2. flink中的TIMESTAMP_LTZ行为和mysql中的timestamp类型,用于携带时区
  3. 二者都要求字段必须是时间类型才能自动转化为flink的TIMESTAMP 和TIMESTAMP_LTZ,但是实际开发中并非如此,我们可能会看到用字符串表示的时间:'2022-09-30 18:34:33'或者是用long类型表示的时间:1664532356103。? 这种时候直接用是一定会抛出异常的,因为flink无法将非时间字段转化为flink内部的:TIMESTAMP 和TIMESTAMP_LTZ,在这种情况下我们可以用TO_TIMESTAMP 或者是TO_TIMSTAMP_LTZ来进行映射转化。下一小节就会有对应的例子。
  4. 二者都接受一个小于9的参数: TIMSTAMP_LTZ(num),TO_TIMESTAMP(num)? 指的是时间的精确度,开发中经常用到TIMESTAMP(3)或者是TIMSTAMP_LTZ(3)意思是保留三位,实际含义就是精确到毫秒,TIMESTAMP(6)和TIMSTAMP_LTZ(6)表示微秒,TIMESTAMP(9)和TIMSTAMP_LTZ(9)表示纳秒。? 一般用TIMESTAMP(3)或者是TIMSTAMP_LTZ(3)就够了。
  5. 经验:如果流数据中的时间字段不携带时区信息则用TIMESTAMP(3),否则的话用TIMESTAMP_LTZ(3),? 如果流数据中时间字段类型是String:'2022-09-30 18:34:33'用:TIMESTAMP(3), 如果如果流数据中时间字段类型是long:1664532356103,? 则强烈建议:TIMESTAMP_LTZ(3), 因为这可以将时间戳转化为本地时间。

?3.event? time?Attribute 指定的两种方式

3.1创建table DDL 的时候指定event time attribute 在创建DDL的时候用?

WATERMARK statement(水位线语句) 定义, 水位线 语句将其使用的时间字段段标记为 event time attribute,这是一个自动标记的过程。WATERMARK statement使用到的时间字段类型必须是TIMESTAMP 或者是 TIMESTAMP_LTZ?

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),//user_action_time 本身是时间类型,
                                //所以可以直接映射为flink的TIMESTAMP

  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);


CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  ts BIGINT,
  time_ltz AS TIMESTAMP(3),//ts 在数据流中不是时间类型,而是long数字类型
                           //因此必须转换为flink内置的时间类型
  -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

3.2 DataStream-to-table的时候定义

默认来说DataStream-to-table的时候不会传播DataStream中已经定义好的事件时间和水位线,因此我们必须显示的用Schema来定义水位线的传播。

前提条件:首先先在DataStream中定义好事件时间和水位线。

Flink always derives rowtime attribute as TIMESTAMP WITHOUT TIME ZONE, because DataStream doesn’t have time zone notion, and treats all event time values as in UTC.

通过rowtime 提取Datastream 已有的事件时间属性,因为DataStream中已经存在了,直接拿出来用即可。

DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
//第一种写法
Table table = tEnv.fromDataStream(
stream, 
$("user_name"), $("data"), 
$("user_action_time").rowtime()
);

下面是rowtime()方法的源码,请注意看其注释部分:

    /**
     * Declares a field as the rowtime attribute for indicating, 
     * accessing, and working in Flink's event time.
     */public OutType rowtime() {
        return toApiSpecificExpression(unresolvedCall(ROWTIME, toExpr()));
    }


第二种写法:
Schema schema = Schema.newBuilder()
            .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
            .watermark("rowtime", "SOURCE_WATERMARK()")
            .build());

Table table = tEnv.fromDataStream(stream,schema )

3.3不用DataStream中的分配好的时间个水位线

如果DataStream中没有事件时间和水位线,我们可以在table中重新定义写法如下:

Schema.newBuilder()
        .columnByExpression("myrowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
        .watermark("myrowtime", "myrowtime- INTERVAL '10' SECOND")
        .build());

myrowtime是随便定义的一个字段用来保存时间,
CAST是因为数据中的事件是string必须转成flink支持的
TIMESTAMP,如果你的数据中的时间本身是时间类型
则可以直接: .columnByExpression("rowtime", "TIMESTAMP_LTZ(3)")
当 .watermark("myrowtime", "rowtime - INTERVAL '10' SECOND")运行的时候
意味着myrowtime字段被当成了record time.

4.processing time

processing? time使用的时候不需要水位线

4.1 ddl指定

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
) WITH (
  ...
);

4.2 datastream-to-table 指定

DataStream<Tuple2<String, String>> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());


//第二种写法
Schema schema = Schema.newBuilder()
                .columnByExpression("proc_time", "PROCTIME()")
                .build();
Table table = tEnv.fromDataStream(stream,schema );

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-08 20:48:45  更:2022-10-08 20:51:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年5日历 -2025/5/1 19:22:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码