Watermark
Watermark是一种衡量Event Time进展的机制。
Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。
数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。
Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime–t,那么这个窗口被触发执行。
watermark的引入
最常见的引入方式
dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timestamp * 1000
}
} )
AssignerWithPunctuatedWatermarks
?周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
?默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
?升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的。
AssignerWithPeriodicWatermarks
?没有时间周期规律,可打断的生成 watermark
代码
package com.atguigu.aqitest
import java.util
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.apache.lucene.spatial3d.geom.Bounded
object WindowTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(111)
val inputStream = env.socketTextStream("192.168.1.102", 7777)
val dataStream: DataStream[SensorReading] = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}
)
.assignTimestampsAndWatermarks(new MyWMAssigner)
val resultStream = dataStream
.keyBy("id")
.timeWindow(Time.seconds(15), Time.seconds(5))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(new OutputTag[SensorReading]("late"))
.apply(new MyWindowFun)
dataStream.print("data")
resultStream.getSideOutput(new OutputTag[SensorReading]("late"))
resultStream.print("result")
env.execute("window test")
}
}
class MyWindowFun() extends WindowFunction[SensorReading, (Long, Int), Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {
out.collect((window.getStart, input.size))
}
}
class MyWMAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
val lateness: Long = 1000L
var maxTs: Long = Long.MinValue + lateness
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - lateness)
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp * 1000L)
element.timestamp * 1000L
}
}
class MyVmAssigner2 extends AssignerWithPunctuatedWatermarks[SensorReading] {
override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
val lateness: Long = 1000L
if (lastElement.id == "sensor_1")
new Watermark(extractedTimestamp - lateness)
else null
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
element.timestamp * 1000L
}
}
总结
1、watermark就是事件时间,代表时间的进展
2、watermark主要用来处理乱序数据,一般就是直接定义一个延迟时间,延迟触发窗口操作
这里的延迟,指的是当前收到的数据内的时间戳
3、watermark延迟时间的设置,一般设置成最大乱序程度来定,通常设置成最大乱序程度
如果按照最大乱序程度定,那么就能保证所有的数据都是正确的
要权衡正确性和实时性的话,可以不按最大乱序程度,而是给一个相对较小的watermark延迟
watermark延迟时间,完全是程序自己定义的
最好的处理方式,是先了解数据的分布情况(抽样、或者根据经验、机器学习算法),可以指定一个合理的延迟,比较小,还能处理绝大多数乱序的情况
4、关窗操作,必须是时间进展到窗口关闭时间,事件时间语义下就是watermark达到窗口关闭时间
当前Ts最大时间戳-延迟时间=watermark,如果现在的watermark大于等于窗口结束时间,就关闭窗口
5、watermark代表的含义是,之后就不会再来时间戳比watermark里面的数值小的数据了
如果有不同分区的上游分区,当前任务会对它们创建各自的分区watermark,当前任务的事件时间就是最小的那个
6、处理乱序数据,Flink有三重保证
watermark可以设置延迟时间
window的allowedLateness方法,可以设置窗口允许处理迟到数据的时间
window的sideOutputLateData方法,可以将迟到的数据写入侧输出流
窗口有两个重要操作:触发计算,清空状态(关闭窗口)
ProcessFunction API
Flink提供了8个Process Function:
· ProcessFunction
· KeyedProcessFunction
· CoProcessFunction
· ProcessJoinFunction
· BroadcastProcessFunction
· KeyedBroadcastProcessFunction
· ProcessWindowFunction
· ProcessAllWindowFunction
KeyedProcessFunction
· processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(side outputs)。
· onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
TimerService 和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
· currentProcessingTime(): Long 返回当前处理时间
· currentWatermark(): Long 返回当前watermark的时间戳
· registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
· registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
· deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
· deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。
代码
import com.atguigu.aqitest.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.socketTextStream("localhost", 7777)
val dataStream: DataStream[SensorReading] = inputStream
.map( data => {
val dataArray = data.split(",")
SensorReading( dataArray(0), dataArray(1).toLong, dataArray(2).toDouble )
} )
val warningStream: DataStream[String] = dataStream
.keyBy("id")
.process( new TempIncreWarning(10000L) )
warningStream.print()
env.execute("process function job")
}
}
class TempIncreWarning(interval: Long) extends KeyedProcessFunction[Tuple, SensorReading, String]{
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("cur-timer-ts", classOf[Long]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#Context, out: Collector[String]): Unit = {
val lastTemp = lastTempState.value()
val curTimerTs = curTimerTsState.value()
lastTempState.update(value.temperature)
if( value.temperature > lastTemp && curTimerTs == 0 ){
val ts = ctx.timerService().currentProcessingTime() + interval
ctx.timerService().registerProcessingTimeTimer(ts)
curTimerTsState.update(ts)
}
else if( value.temperature < lastTemp ){
ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
curTimerTsState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect( "温度值连续" + interval/1000 + "秒上升" )
curTimerTsState.clear()
}
}
侧输出流(SideOutput
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
代码
package com.atguigu.aqitest
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream=env.socketTextStream("localhost",7777)
val dataStream:DataStream[SensorReading] =inputStream.map(
data=>{
val dataArray=data.split(",")
SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
})
val highTempStream: DataStream[SensorReading]= dataStream
.process(new SplitTempProcessor(30))
val lowTempStream: DataStream[(String, Double, Long)] =highTempStream.getSideOutput(new OutputTag[(String,Double,Long)]("low-temp"))
highTempStream.print("high")
lowTempStream.print("low")
env.execute("side output test")
}
}
class SplitTempProcessor(threshold:Int)extends ProcessFunction[SensorReading,SensorReading]{
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if(value.temperature>threshold){
out.collect(value)
}else{
ctx.output(new OutputTag[(String,Long,Double)]("low-temp"),(value.id,value.temperature,value.timestamp))
}
}
}
状态编程
流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。有状态的计算则会基于多个事件输出结果。
算子状态operate state
Flink为算子状态提供三种基本数据结构:
实现接口ListCheckpointed
列表状态(List state)
将状态表示为一组数据的列表。
联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
键控状态keyed state
Flink的Keyed State支持以下数据类型:
**ValueState[T]**保存单个的值,值的类型为T。
? get操作: ValueState.value()
? set操作: ValueState.update(value: T)
**ListState[T]**保存一个列表,列表里的元素的数据类型为T。基本操作如下:
? ListState.add(value: T)
? ListState.addAll(values: java.util.List[T])
? ListState.get()返回Iterable[T]
? ListState.update(values: java.util.List[T])
**MapState[K, V]**保存Key-Value对。
? MapState.get(key: K)
? MapState.put(key: K, value: V)
? MapState.contains(key: K)
? MapState.remove(key: K)
ReducingState[T]
AggregatingState[I, O]
State.clear()是清空操作。
代码
package com.atguigu.aqitest
import java.util
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntermediateTableScan
import org.apache.flink.util.Collector
object StateTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000L)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(30000L)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L)
env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L))
env.setRestartStrategy(RestartStrategies.failureRateRestart(5,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)))
env.setRestartStrategy(RestartStrategies.fallBackRestart())
val inputStream = env.socketTextStream("localhost", 7777)
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
val resultStream = dataStream
.keyBy("id")
val warningStream: DataStream[(String, Double, Double)] = dataStream.keyBy("id").map(new TempChangeWarning(10.0))
warningStream.print("warning")
env.execute("state test job")
}
}
class TempChangeWarning(threshold: Double) extends RichMapFunction[SensorReading, (String, Double, Double)] {
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
}
override def map(value: SensorReading): (String, Double, Double) = {
val lastTemp = lastTempState.value()
lastTempState.update(value.temperature)
val diff = (value.temperature - lastTemp).abs
if (diff > threshold) {
(value.id, lastTemp, value.temperature)
} else {
null
}
}
}
class TempChangeWarningWithFlatmap(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
val lastTemp: Double = lastTempState.value()
lastTempState.update(value.temperature)
val diff: Double = (value.temperature - lastTemp).abs
if (diff > threshold) {
out.collect((value.id, lastTemp, value.temperature))
}
}
}
class MyProcessor extends KeyedProcessFunction[String, SensorReading, Int] {
var myState: ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
myState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("my-state", classOf[Int]))
}
lazy val myListSate: ListState[String] = getIterationRuntimeContext.getListState(new ListStateDescriptor[String]("myList", classOf[String]))
lazy val myMapState: MapState[String, Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Double]("my-mapList", classOf[String], classOf[Double]))
lazy val myReducingState: ReducingState[SensorReading] = getRuntimeContext
.getReducingState(new ReducingStateDescriptor[SensorReading]("my-reducingState", new ReduceFunction[SensorReading] {
override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
SensorReading(value1.id, value1.timestamp.max(value2.timestamp), value1.temperature.min(value2.temperature))
}
}, classOf[SensorReading]))
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int]#Context, out: Collector[Int]): Unit = {
myState.value()
myState.update(1)
myListSate.add("hello")
myListSate.addAll(new util.ArrayList[String]())
myMapState.put("sensor_1", 12)
myMapState.get("sensor_1")
myReducingState.add(value)
myReducingState.clear()
}
}
class MyMapper() extends RichMapFunction[SensorReading, Long] with ListCheckpointed[Long] {
var count: Long = 0L
override def map(value: SensorReading): Long = {
count += 1
count
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
val stateList = new util.ArrayList[Long]()
stateList.add(count)
stateList
}
override def restoreState(state: util.List[Long]): Unit = {
val iter=state.iterator()
while (iter.hasNext()){
count+=iter.next()
}
}
}
状态后端
MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
注意:RocksDB的支持并不直接包含在flink中,需要引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
</dependency>
Table API和Flink SQL
介绍

TableExample
package Table
import com.atguigu.aqitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
object TableExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\FLINKTutorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream.map(data => {
val array: Array[String] = data.split(",")
SensorReading(array(0), array(1).toLong, array(2).toDouble)
})
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val dataTable: Table = tableEnv.fromDataStream(dataStream)
val resultTable: Table = dataTable
.select("id,temperature")
.filter("id=='sensor_1'")
tableEnv.createTemporaryView("dataTable",dataTable)
val resultSqlTable:Table=tableEnv.sqlQuery("select id, temperature from dataTable where id = 'sensor_1'")
val resultStream: DataStream[(String, Double)] = resultTable.toAppendStream[(String, Double)]
resultStream.print("result")
resultTable.printSchema()
env.execute("table example job")
}
}
环境的创建
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
val batchEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)
val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val bbTableEnv = TableEnvironment.create(bbSettings)
连接到文件系统
val filePath = "D:\\code\\ideaWorkspace\\FLINKTutorial\\src\\main\\resources\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable")
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
)
.withFormat(new Csv)
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")
val sensorTable: Table = tableEnv.from("inputTable")
val resultTable: Table = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")
val resultSqlTable: Table = tableEnv.sqlQuery(
"""
|select id,temperature
|from inputTable
|where id ='sensor_1'
|""".stripMargin
)
val aggResultTable:Table=sensorTable
.groupBy('id)
.select('id,'id.count as 'count)
val aggResultSqlTable:Table=tableEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id")
val sensorTable1: Table = tableEnv.from("inputTable")
sensorTable1.toAppendStream[(String, Long, Double)].print()
env.execute("table api test job")
aggResultSqlTable.toRetractStream[(String,Double)].print("agg")
resultTable.toAppendStream[(String,Long)].print("result")
输出到文件
package Table
import com.atguigu.aqitest.SensorReading
import com.carrotsearch.hppc.ShortCharMap
import com.sun.prism.PixelFormat.DataType
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object OutputTableTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.readTextFile("")
val dataStream = inputStream.map(
data => {
val array = data.split(",")
SensorReading(array(0), array(1).toLong, array(2).toDouble)
}
)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)
val resultTable: Table = sensorTable
.select('id, 'temp)
.filter('id === "sensor_1")
tableEnv.connect(new FileSystem().path(""))
.withFormat(new Csv)
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
).createTemporaryTable("outputTable")
resultTable.insertInto("outputTable")
sensorTable.printSchema()
sensorTable.toAppendStream[(String, Double, Long)].print()
env.execute("output table test")
}
}
写入kafka
package Table
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
object KafkaTableTest {
def main(args: Array[String]): Unit = {
val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings:EnvironmentSettings=EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv:StreamTableEnvironment=StreamTableEnvironment.create(env,settings)
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
)
.withFormat(new Csv)
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")
val sensorTable:Table=tableEnv.from("kafkaInputTable")
val resultTable: Table = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")
val aggResultTable:Table=resultTable
.groupBy('id)
.select('id,'id.count as 'count)
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sinkTest")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
)
.withFormat(new Csv)
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable")
resultTable.insertInto("kafkaOutputTable")
env.execute("")
}
}
|