Thanks for the confirmation Aljoscha! I wrote up results from my little experiment: https://github.com/zcox/flink-repartition-watermark-example
-Zach On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > yes, your description is spot on! > > Cheers, > Aljoscha > > On 26 Feb 2016, at 00:19, Zach Cox <zcox...@gmail.com> wrote: > > > > I think I found the information I was looking for: > > > > RecordWriter broadcasts each emitted watermark to all outgoing channels > [1]. > > > > StreamInputProcessor tracks the max watermark received on each incoming > channel separately, and computes the task's watermark as the min of all > incoming watermarks [2]. > > > > Is this an accurate summary of Flink's watermark propagation? > > > > So in my previous example, each window count task is building up a count > for each window based on incoming event's timestamp, and when all incoming > watermarks have progressed beyond the end of the window, the count is > emitted. So if one partition's watermark lags behind the other, it just > means the window output is triggered based on this lagging watermark. > > > > -Zach > > > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103 > > [2] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147 > > > > > > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote: > > Hi - how are watermarks passed along parallel tasks where there is a > repartition? For example, say I have a simple streaming job computing > hourly counts per key, something like this: > > > > val environment = StreamExecutionEnvironment.getExecutionEnvironment > > environment.setParallelism(2) > > environment.setStreamTimeCharacteristic(EventTime) > > environment.getConfig.enableTimestamps() > > environment > > .addSource(...) > > .assignAscendingTimestamps(_.timestamp) > > .keyBy("someField") > > .timeWindow(Time.hours(1)) > > .fold(0, (count, element) => count + 1) > > .addSink(...) > > environment.execute("example") > > > > Say the source has 2 parallel partitions (e.g. Kafka topic) and the > events from the source contain timestamps, but over time the 2 source tasks > diverge in event time (maybe 1 Kafka topic partition has many more events > than the other). > > > > The job graph looks like this: http://imgur.com/hxEpF6b > > > > From what I can tell, the execution graph, with parallelism=2, would > look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash > partition to be used, so that events with the same key end up at the same > window subtask, regardless of which source partition they came from. > > > > Since the watermarks are skewed between the parallel pipelines, what > happens when differing watermarks are sent to the window count operators? > Is something tracking the min incoming watermark there? Could anyone point > me to Flink code that implements this? I'd really like to learn more about > how this works. > > > > Thanks, > > Zach > > > > > >