Hi,

Regrettably I must admit the WatermarkStrategy is not very scala friendly :(


1) After a couple of tries what I'd recommend as the most reliable is to
pass it through anonymous classes:


    .assignTimestampsAndWatermarks(
      WatermarkStrategy.forGenerator[(String, Long)](
        new WatermarkGeneratorSupplier[(String, Long)] {
          override def createWatermarkGenerator(context:
WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] =

            new MyPeriodicGenerator
        }
      )
        .withTimestampAssigner(new
SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(t: (String, Long), l: Long):
Long = t._2
        })
    )


2) With scala 2.12 you can try the automatic conversion of scala's
lambdas to java's SAM, but unfortunately when I tried it, it failed for
timestamp assigner with some problems in the serialization stack. I
could not identify the root problem of it yet. Therefore I can not fully
recommend it.


.assignTimestampsAndWatermarks(
      WatermarkStrategy.forGenerator[(String, Long)](
        _ => new MyPeriodicGenerator
      )
        .withTimestampAssigner((e, _) => e._2)
    )


I create a ticket to improve the situation here:
https://issues.apache.org/jira/browse/FLINK-18873


Best,

Dawid


On 08/08/2020 10:18, Lu Weizheng wrote:
> Hi there,
>
> Flink 1.11 comes with the new WatermarkStrategy API to assign
> timestamp and watermark. I find there is no example in Scala.  I have
> a (String, Long) Stream, can anyone help implement WatermarkStrategy?
> I will be really gratefully!
>
> val input: DataStream[(String, Long)] = ...
> val watermark = input.assignTimestampsAndWatermarks(
>     WatermarkStrategy.forGenerator(...)
>     )
>
> class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
>   final private val maxOutOfOrderness = 60 * 1000 private var 
> currentMaxTimestamp = 0L override def onEvent(event: (String, Long), 
> eventTimestamp: Long, output: WatermarkOutput): Unit = {
>     currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
>   }
>
>   override def onPeriodicEmit(output: WatermarkOutput): Unit = {
>     output.emitWatermark(new Watermark(currentMaxTimestamp - 
> maxOutOfOrderness))
>   }
> }
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to