If you were to use per-partition watermarking, which you can do by
calling assignTimestampsAndWatermarks directly on the Flink Kafka consumer
[1], then I believe the idle partition(s) would consistently hold back the
overall watermark.

With per-partition watermarking, each Kafka source task will apply the
watermarking separately to each partition it is handling, and then emit as
its watermark the minimum of those per-partition watermarks. At least one
of the Kafka source tasks will therefore have a watermark of 0, and
assuming you have a keyBy after the watermarking and before the process
function, that will hold back the watermark at the process function.

Otherwise, if you apply watermarking to the output of the Kafka source
tasks, then whether the watermarking tasks have a watermark of 0 or not
depends on whether their corresponding Kafka source task has any non-idle
partitions. If the assignment of partitions to instances isn't
deterministic, this could explain why you are seeing different results in
IntelliJ.

Note that the handling of idle sources has been reworked in Flink 1.11 [2],
and bug fixes related to that are still pending [3].

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[3] https://issues.apache.org/jira/browse/FLINK-18934

On Thu, Oct 1, 2020 at 12:10 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> I am considering this watermarker:
>
> ```scala
> class MyWatermarker(val maxTimeLag: Long = 0)
>     extends AssignerWithPeriodicWatermarks[MyEvent] {
>   var maxTs: Long = 0
>
>   override def extractTimestamp(e: MyEvent, previousElementTimestamp:
> Long):
> Long = {
>     val timestamp = e.timestamp
>     maxTs = maxTs.max(timestamp)
>     timestamp
>   }
>
>   override def getCurrentWatermark: WatermarkOld = {
>     println(s"event watermark: ${maxTs - maxTimeLag}")
>     new WatermarkOld(maxTs - maxTimeLag)
>   }
> ```
>
> The underlying events come from a kafka source, and are then handed to a
> process function. The implementation is irrelevant for the question, I will
> just share the relevant bit:
>
> ```scala
>   override def processElement(
>     event: MyEvent,
>     ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
>     out: Collector[StreamEvent]
>   ): Unit = {
>     println(
>       s"In process function, got event: $event, ctx.timestamp:
> ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
>     )
>   ...
>   }
> ```
>
> When I run this app on a real kubernetes cluster using a kafka source topic
> having idle partitions, the watermark is held back to 0 as expected:
>
> ```
> In process function, got event: xxx, ctx.timestamp: 1601475710619,
> currentWatermark: 0
> ```
>
> I can also see these logs generated in the watermarker:
>
> ```
> event watermark: 1601475710619
> event watermark: 0
> event watermark: 1601475710619
> event watermark: 0
> ```
>
> The funny thing is that when I run the same application locally on
> IntelliJ,
> and also have idle kafka partitions for the same topic, I am getting also
> the above logs from the watermarker, with the watermark oscillating between
> 0 and the ts of the latest received element, since `maxLag = 0`. However,
> quite unexpectedly for me, the logs from the process function show that the
> watermark is yet advancing:
>
> ```
> In process function, got event: xxx, ctx.timestamp: 1601475710619,
> currentWatermark: 1601475710618
> ```
>
> Why is this happening? FYI, I am using Flink 1.10 with the environment
> parallelism set to 2 and event time semantics in both cases.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to