Thank you Dawid, I am using Scala 1.11. I come up with the same 1) solution as it may not be scala friendly. So I come here to ask question. Hope the new API may not change significantly.
Best Regards, Weizheng 2020年8月10日 下午8:29,Dawid Wysakowicz <[email protected]<mailto:[email protected]>> 写道: Hi, Regrettably I must admit the WatermarkStrategy is not very scala friendly :( 1) After a couple of tries what I'd recommend as the most reliable is to pass it through anonymous classes: .assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator[(String, Long)]( new WatermarkGeneratorSupplier[(String, Long)] { override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] = new MyPeriodicGenerator } ) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { override def extractTimestamp(t: (String, Long), l: Long): Long = t._2 }) ) 2) With scala 2.12 you can try the automatic conversion of scala's lambdas to java's SAM, but unfortunately when I tried it, it failed for timestamp assigner with some problems in the serialization stack. I could not identify the root problem of it yet. Therefore I can not fully recommend it. .assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator[(String, Long)]( _ => new MyPeriodicGenerator ) .withTimestampAssigner((e, _) => e._2) ) I create a ticket to improve the situation here: https://issues.apache.org/jira/browse/FLINK-18873 Best, Dawid On 08/08/2020 10:18, Lu Weizheng wrote: Hi there, Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala. I have a (String, Long) Stream, can anyone help implement WatermarkStrategy? I will be really gratefully! val input: DataStream[(String, Long)] = ... val watermark = input.assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator(...) ) class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] { final private val maxOutOfOrderness = 60 * 1000 private var currentMaxTimestamp = 0L override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = { currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp) } override def onPeriodicEmit(output: WatermarkOutput): Unit = { output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness)) } }
