Hi - how are watermarks passed along parallel tasks where there is a
repartition? For example, say I have a simple streaming job computing
hourly counts per key, something like this:

val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(2)
environment.setStreamTimeCharacteristic(EventTime)
environment.getConfig.enableTimestamps()
environment
  .addSource(...)
  .assignAscendingTimestamps(_.timestamp)
  .keyBy("someField")
  .timeWindow(Time.hours(1))
  .fold(0, (count, element) => count + 1)
  .addSink(...)
environment.execute("example")

Say the source has 2 parallel partitions (e.g. Kafka topic) and the events
from the source contain timestamps, but over time the 2 source tasks
diverge in event time (maybe 1 Kafka topic partition has many more events
than the other).

The job graph looks like this: http://imgur.com/hxEpF6b

>From what I can tell, the execution graph, with parallelism=2, would look
like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to
be used, so that events with the same key end up at the same window
subtask, regardless of which source partition they came from.

Since the watermarks are skewed between the parallel pipelines, what
happens when differing watermarks are sent to the window count operators?
Is something tracking the min incoming watermark there? Could anyone point
me to Flink code that implements this? I'd really like to learn more about
how this works.

Thanks,
Zach

Reply via email to