Hi, First of all, thank you for the `shuffle()` tip. It works. However, I still don't understand why it doesn't work without calling `shuffle()`.
Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? All the trips has keys and timestamps. As I said in my reply to Paul, I see the same watermarks being extracted. How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` matter? My understanding is any specific window for a specific key always receives the exactly same data, and the calling order of `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. To make `keyBy` as irrelevant as possible, I tried letting it always return the same key so that there is only 1 keyed stream and it is exactly the same as the original unkeyed stream. It still doesn't trigger windows: ```java DataStream<Trip> trips = env.addSource(consumer); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); AllWindowedStream<Trip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` It makes no sense to me. Please help me understand why it doesn't work. Thanks! On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote: > Hi, > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > could receive the elements(trip). If that is the case > BoundedOutOfOrdernessTimestampExtractor, which does not receive element > would not send the WM. Since that the timeWindowAll operator could not be > triggered. > You could add a shuffle() before the assignTimestampsAndWatermarks in your > second case and check if the window is triggered. If it could be triggered > you could check the distribution of elements generated by the source. > > Best, > Guowei > > > an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道: > > > I don't think it is the watermark. I see the same watermarks from the two > > versions of code. > > > > The processing on the keyed stream doesn't change event time at all. I can > > simply change my code to use `map` on the keyed stream to return back the > > input data, so that the window operator receives the exactly same data. The > > only difference is when I do `assignTimestampsAndWatermarks`. The result is > > the same, `assignTimestampsAndWatermarks` before `keyBy` works: > > ```java > > DataStream<Trip> trips = > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > ```java > > DataStream<Trip> trips = env.addSource(consumer); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<Trip> featurizedUserTrips = > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > It feels a bug to me, but I want to confirm it before I file the bug > > report. > > > > On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote: > > > Hi, > > > > > > Could you check the watermark of the window operator? One possible > > situation would be some of the keys are not getting enough inputs, so their > > watermarks remain below the window end time and hold the window operator > > watermark back. IMO, it’s a good practice to assign watermark earlier in > > the data pipeline. > > > > > > Best, > > > Paul Lam > > > > > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道: > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works: > > > > ```java > > > > DataStream<Trip> trips = > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(Trip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new > > Featurization()); > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > > > > > But not after `keyBy` and `process`: > > > > ```java > > > > DataStream<Trip> trips = env.addSource(consumer); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<FeaturizedTrip> featurizedUserTrips = > > > > userTrips.process(new > > Featurization()).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(FeaturizedTrip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > Windows are never triggered. > > > > > > > > Is it a bug or expected behavior? If the latter, where is it > > documented? > > > > > > > > > > > > >