HI,

BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
receives an element.

For after Keyby:
Flink uses the HashCode of key and the parallelism of down stream to decide
which subtask would receive the element. This means if your key is always
same, all the sources will only send the elements to the same down stream
task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.

For before Keyby:
In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
be chained together, which means every
BoundedOutOfOrdernessTimestampExtractors will receive elements.

Best,
Guowei


an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道:

> Hi,
>
> First of all, thank you for the `shuffle()` tip. It works. However, I
> still don't understand why it doesn't work without calling `shuffle()`.
>
> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> All the trips has keys and timestamps. As I said in my reply to Paul, I see
> the same watermarks being extracted.
>
> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> matter? My understanding is any specific window for a specific key always
> receives the exactly same data, and the calling order of
> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
>
> To make `keyBy` as irrelevant as possible, I tried letting it always
> return the same key so that there is only 1 keyed stream and it is exactly
> the same as the original unkeyed stream. It still doesn't trigger windows:
> ```java
> DataStream<Trip> trips = env.addSource(consumer);
> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
> DataStream<Trip> featurizedUserTrips =
>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>     @Override
>     public long extractTimestamp(Trip trip) {
>         return trip.endTime.getTime();
>     }
> });
> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> featurizedUserTrips.timeWindowAll(Time.days(7),
>         Time.days(1));
> ```
>
> It makes no sense to me. Please help me understand why it doesn't work.
> Thanks!
>
> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote:
> > Hi,
> > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> > could receive the elements(trip). If that is the case
> > BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> > would not send the WM. Since that the timeWindowAll operator could not be
> > triggered.
> > You could add a shuffle() before the assignTimestampsAndWatermarks in
> your
> > second case and check if the window is triggered.  If it could be
> triggered
> > you could check the distribution of elements generated by the source.
> >
> > Best,
> > Guowei
> >
> >
> > an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:
> >
> > > I don't think it is the watermark. I see the same watermarks from the
> two
> > > versions of code.
> > >
> > > The processing on the keyed stream doesn't change event time at all. I
> can
> > > simply change my code to use `map` on the keyed stream to return back
> the
> > > input data, so that the window operator receives the exactly same
> data. The
> > > only difference is when I do `assignTimestampsAndWatermarks`. The
> result is
> > > the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> > > ```java
> > > DataStream<Trip> trips =
> > >         env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > >     @Override
> > >     public long extractTimestamp(Trip trip) {
> > >         return trip.endTime.getTime();
> > >     }
> > > });
> > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > >         Time.days(1));
> > > ```
> > >
> > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> > > ```java
> > > DataStream<Trip> trips = env.addSource(consumer);
> > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > DataStream<Trip> featurizedUserTrips =
> > >         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > >     @Override
> > >     public long extractTimestamp(Trip trip) {
> > >         return trip.endTime.getTime();
> > >     }
> > > });
> > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > >         Time.days(1));
> > > ```
> > >
> > > It feels a bug to me, but I want to confirm it before I file the bug
> > > report.
> > >
> > > On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
> > > > Hi,
> > > >
> > > > Could you check the watermark of the window operator? One possible
> > > situation would be some of the keys are not getting enough inputs, so
> their
> > > watermarks remain below the window end time and hold the window
> operator
> > > watermark back. IMO, it’s a good practice to assign watermark earlier
> in
> > > the data pipeline.
> > > >
> > > > Best,
> > > > Paul Lam
> > > >
> > > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > > > >
> > > > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > > ```java
> > > > > DataStream<Trip> trips =
> > > > >        env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > >            @Override
> > > > >            public long extractTimestamp(Trip trip) {
> > > > >                return trip.endTime.getTime();
> > > > >            }
> > > > >        });
> > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> trip.userId);
> > > > > DataStream<FeaturizedTrip> featurizedUserTrips =
> userTrips.process(new
> > > Featurization());
> > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >                Time.days(1));
> > > > > ```
> > > > >
> > > > > But not after `keyBy` and `process`:
> > > > > ```java
> > > > > DataStream<Trip> trips = env.addSource(consumer);
> > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> trip.userId);
> > > > > DataStream<FeaturizedTrip> featurizedUserTrips =
> > > > >        userTrips.process(new
> > > Featurization()).assignTimestampsAndWatermarks(new
> > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > > > >            @Override
> > > > >            public long extractTimestamp(FeaturizedTrip trip) {
> > > > >                return trip.endTime.getTime();
> > > > >            }
> > > > >        });
> > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > >                Time.days(1));
> > > > > ```
> > > > > Windows are never triggered.
> > > > >
> > > > > Is it a bug or expected behavior? If the latter, where is it
> > > documented?
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to