I think I found the information I was looking for:

RecordWriter broadcasts each emitted watermark to all outgoing channels [1].

StreamInputProcessor tracks the max watermark received on each incoming
channel separately, and computes the task's watermark as the min of all
incoming watermarks [2].

Is this an accurate summary of Flink's watermark propagation?

So in my previous example, each window count task is building up a count
for each window based on incoming event's timestamp, and when all incoming
watermarks have progressed beyond the end of the window, the count is
emitted. So if one partition's watermark lags behind the other, it just
means the window output is triggered based on this lagging watermark.

-Zach

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147


On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote:

> Hi - how are watermarks passed along parallel tasks where there is a
> repartition? For example, say I have a simple streaming job computing
> hourly counts per key, something like this:
>
> val environment = StreamExecutionEnvironment.getExecutionEnvironment
> environment.setParallelism(2)
> environment.setStreamTimeCharacteristic(EventTime)
> environment.getConfig.enableTimestamps()
> environment
>   .addSource(...)
>   .assignAscendingTimestamps(_.timestamp)
>   .keyBy("someField")
>   .timeWindow(Time.hours(1))
>   .fold(0, (count, element) => count + 1)
>   .addSink(...)
> environment.execute("example")
>
> Say the source has 2 parallel partitions (e.g. Kafka topic) and the events
> from the source contain timestamps, but over time the 2 source tasks
> diverge in event time (maybe 1 Kafka topic partition has many more events
> than the other).
>
> The job graph looks like this: http://imgur.com/hxEpF6b
>
> From what I can tell, the execution graph, with parallelism=2, would look
> like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to
> be used, so that events with the same key end up at the same window
> subtask, regardless of which source partition they came from.
>
> Since the watermarks are skewed between the parallel pipelines, what
> happens when differing watermarks are sent to the window count operators?
> Is something tracking the min incoming watermark there? Could anyone point
> me to Flink code that implements this? I'd really like to learn more about
> how this works.
>
> Thanks,
> Zach
>
>
>

Reply via email to