Hi, this should be covered here: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams
Best, Fabian Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <an0...@gmail.com>: > This explanation is exactly what I'm looking for, thanks! Is such an > important rule documented anywhere in the official document? > > On 2019/04/30 08:47:29, Fabian Hueske <fhue...@gmail.com> wrote: > > An operator task broadcasts its current watermark to all downstream tasks > > that might receive its records. > > If you have an the following code: > > > > DataStream<X> a = ... > > a.map(A).map(B).keyBy(....).window(C) > > > > and execute this with parallelism 2, your plan looks like this > > > > A.1 -- B.1 --\--/-- C.1 > > X > > A.2 -- B.2 --/--\-- C.2 > > > > A.1 will propagate its watermarks to B.1 because only B.1 will receive > its > > output events. > > However, B.1 will propagate its watermarks to C.1 and C.2 because the > > output of B.1 is partitioned and all C tasks might receive output events > > from B.1. > > > > Best, Fabian > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <an0...@gmail.com>: > > > > > Thanks very much. It definitely explains the problem I'm seeing. > However, > > > something I need to confirm: > > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data > > > flows through a specific key's stream, all key streams have the same > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there > at > > > all? > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > > > > Hi, > > > > > > > > Watermarks are meta events that travel independently of data events. > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > > > > instances of trips have some data(this is my assumption) so > Watermarks > > > > can be generated. Afterwards even if some of the keyed partitions > have > > > > no data, Watermarks are broadcasted/forwarded anyway. In other words > if > > > > at some point Watermarks were generated for all partitions of a > single > > > > stage, they will be forwarded beyond this point. > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to > assign > > > > watermarks for an empty partition which produces no Watermarks at all > > > > for this partition, therefore there is no progress beyond this point. > > > > > > > > I hope this clarifies it a bit. > > > > > > > > Best, > > > > > > > > Dawid > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > If my understanding is correct, then why > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > `timeWindowAll` > > > stream's input streams are task 1 and task 2, with task 2 idling, no > matter > > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, > because > > > whether task 2 receives elements only depends on the key distribution, > has > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > / \ > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > timeWindowAll > > > > > > > > \ idle / > > > > > > > > \key 2 trips/ > > > > > > > > > > /key 1 trips--> > > > assignTimestampsAndWatermarks\ > > > > > / > > > \ > > > > > (B) trips-->keyBy > > > timeWindowAll > > > > > \ idle > > > / > > > > > \key 2 trips--> > > > assignTimestampsAndWatermarks/ > > > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > > perspective? > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakow...@apache.org> > > > wrote: > > > > >> Hi, > > > > >> > > > > >> Yes I think your explanation is correct. I can also recommend > Seth's > > > > >> webinar where he talks about debugging Watermarks[1] > > > > >> > > > > >> Best, > > > > >> > > > > >> Dawid > > > > >> > > > > >> [1] > > > > >> > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > > >> > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > >>> > > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that > mean > > > I get 2 tasks running after `keyBy` if even all elements have the same > key > > > so go to 1 down stream(say task 1)? And it is the other task(task 2) > with > > > no incoming data that caused the `timeWindowAll` stream unable to > progress? > > > Because both task 1 and task 2 are its input streams and one is idling > so > > > its event time cannot make progress? > > > > >>> > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: > > > > >>>> HI, > > > > >>>> > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least > > > after it > > > > >>>> receives an element. > > > > >>>> > > > > >>>> For after Keyby: > > > > >>>> Flink uses the HashCode of key and the parallelism of down > stream > > > to decide > > > > >>>> which subtask would receive the element. This means if your key > is > > > always > > > > >>>> same, all the sources will only send the elements to the same > down > > > stream > > > > >>>> task, for example only no. 3 > > > BoundedOutOfOrdernessTimestampExtractor. > > > > >>>> > > > > >>>> For before Keyby: > > > > >>>> In your case, the Source and > > > BoundedOutOfOrdernessTimestampExtractors would > > > > >>>> be chained together, which means every > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > > > > >>>> > > > > >>>> Best, > > > > >>>> Guowei > > > > >>>> > > > > >>>> > > > > >>>> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道: > > > > >>>> > > > > >>>>> Hi, > > > > >>>>> > > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > > However, I > > > > >>>>> still don't understand why it doesn't work without calling > > > `shuffle()`. > > > > >>>>> > > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors > receive > > > trips? > > > > >>>>> All the trips has keys and timestamps. As I said in my reply to > > > Paul, I see > > > > >>>>> the same watermarks being extracted. > > > > >>>>> > > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS > after > > > `keyBy` > > > > >>>>> matter? My understanding is any specific window for a specific > key > > > always > > > > >>>>> receives the exactly same data, and the calling order of > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect > that. > > > > >>>>> > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it > > > always > > > > >>>>> return the same key so that there is only 1 keyed stream and > it is > > > exactly > > > > >>>>> the same as the original unkeyed stream. It still doesn't > trigger > > > windows: > > > > >>>>> ```java > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > > >>>>> userTrips.map(trip -> > > > trip).assignTimestampsAndWatermarks(new > > > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > >>>>> @Override > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > >>>>> return trip.endTime.getTime(); > > > > >>>>> } > > > > >>>>> }); > > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>> Time.days(1)); > > > > >>>>> ``` > > > > >>>>> > > > > >>>>> It makes no sense to me. Please help me understand why it > doesn't > > > work. > > > > >>>>> Thanks! > > > > >>>>> > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> > wrote: > > > > >>>>>> Hi, > > > > >>>>>> After keyby maybe only some of > > > BoundedOutOfOrdernessTimestampExtractors > > > > >>>>>> could receive the elements(trip). If that is the case > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not > receive > > > element > > > > >>>>>> would not send the WM. Since that the timeWindowAll operator > > > could not be > > > > >>>>>> triggered. > > > > >>>>>> You could add a shuffle() before the > > > assignTimestampsAndWatermarks in > > > > >>>>> your > > > > >>>>>> second case and check if the window is triggered. If it > could be > > > > >>>>> triggered > > > > >>>>>> you could check the distribution of elements generated by the > > > source. > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> Guowei > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道: > > > > >>>>>> > > > > >>>>>>> I don't think it is the watermark. I see the same watermarks > > > from the > > > > >>>>> two > > > > >>>>>>> versions of code. > > > > >>>>>>> > > > > >>>>>>> The processing on the keyed stream doesn't change event time > at > > > all. I > > > > >>>>> can > > > > >>>>>>> simply change my code to use `map` on the keyed stream to > return > > > back > > > > >>>>> the > > > > >>>>>>> input data, so that the window operator receives the exactly > same > > > > >>>>> data. The > > > > >>>>>>> only difference is when I do > `assignTimestampsAndWatermarks`. The > > > > >>>>> result is > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` > works: > > > > >>>>>>> ```java > > > > >>>>>>> DataStream<Trip> trips = > > > > >>>>>>> > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > >>>>>>> @Override > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > >>>>>>> return trip.endTime.getTime(); > > > > >>>>>>> } > > > > >>>>>>> }); > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > trip.userId); > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> > > > trip); > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>> Time.days(1)); > > > > >>>>>>> ``` > > > > >>>>>>> > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > > >>>>>>> ```java > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > trip.userId); > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > >>>>>>> userTrips.map(trip -> > > > trip).assignTimestampsAndWatermarks(new > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > >>>>>>> @Override > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > >>>>>>> return trip.endTime.getTime(); > > > > >>>>>>> } > > > > >>>>>>> }); > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>> Time.days(1)); > > > > >>>>>>> ``` > > > > >>>>>>> > > > > >>>>>>> It feels a bug to me, but I want to confirm it before I file > the > > > bug > > > > >>>>>>> report. > > > > >>>>>>> > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> > wrote: > > > > >>>>>>>> Hi, > > > > >>>>>>>> > > > > >>>>>>>> Could you check the watermark of the window operator? One > > > possible > > > > >>>>>>> situation would be some of the keys are not getting enough > > > inputs, so > > > > >>>>> their > > > > >>>>>>> watermarks remain below the window end time and hold the > window > > > > >>>>> operator > > > > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark > > > earlier > > > > >>>>> in > > > > >>>>>>> the data pipeline. > > > > >>>>>>>> Best, > > > > >>>>>>>> Paul Lam > > > > >>>>>>>> > > > > >>>>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道: > > > > >>>>>>>>> > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > > >>>>>>>>> ```java > > > > >>>>>>>>> DataStream<Trip> trips = > > > > >>>>>>>>> > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > >>>>>>>>> @Override > > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > >>>>>>>>> } > > > > >>>>>>>>> }); > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > >>>>> trip.userId); > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > >>>>> userTrips.process(new > > > > >>>>>>> Featurization()); > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > windowedUserTrips = > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>>>> Time.days(1)); > > > > >>>>>>>>> ``` > > > > >>>>>>>>> > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > >>>>>>>>> ```java > > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > >>>>> trip.userId); > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > >>>>>>>>> userTrips.process(new > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > > >>>>>>> > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > >>>>>>>>> @Override > > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip > trip) { > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > >>>>>>>>> } > > > > >>>>>>>>> }); > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > windowedUserTrips = > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > >>>>>>>>> Time.days(1)); > > > > >>>>>>>>> ``` > > > > >>>>>>>>> Windows are never triggered. > > > > >>>>>>>>> > > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is > it > > > > >>>>>>> documented? > > > > >> > > > > > > > > > > > > > >