Hi,

First of all, thank you for the `shuffle()` tip. It works. However, I still 
don't understand why it doesn't work without calling `shuffle()`. 

Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? All 
the trips has keys and timestamps. As I said in my reply to Paul, I see the 
same watermarks being extracted.

How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` 
matter? My understanding is any specific window for a specific key always 
receives the exactly same data, and the calling order of 
`assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.

To make `keyBy` as irrelevant as possible, I tried letting it always return the 
same key so that there is only 1 keyed stream and it is exactly the same as the 
original unkeyed stream. It still doesn't trigger windows:
```java
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
DataStream<Trip> featurizedUserTrips =
        userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
    @Override
    public long extractTimestamp(Trip trip) {
        return trip.endTime.getTime();
    }
});
AllWindowedStream<Trip, TimeWindow> windowedUserTrips = 
featurizedUserTrips.timeWindowAll(Time.days(7),
        Time.days(1));
```

It makes no sense to me. Please help me understand why it doesn't work. Thanks!

On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote: 
> Hi,
> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> could receive the elements(trip). If that is the case
> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> would not send the WM. Since that the timeWindowAll operator could not be
> triggered.
> You could add a shuffle() before the assignTimestampsAndWatermarks in your
> second case and check if the window is triggered.  If it could be triggered
> you could check the distribution of elements generated by the source.
> 
> Best,
> Guowei
> 
> 
> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:
> 
> > I don't think it is the watermark. I see the same watermarks from the two
> > versions of code.
> >
> > The processing on the keyed stream doesn't change event time at all. I can
> > simply change my code to use `map` on the keyed stream to return back the
> > input data, so that the window operator receives the exactly same data. The
> > only difference is when I do `assignTimestampsAndWatermarks`. The result is
> > the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> > ```java
> > DataStream<Trip> trips =
> >         env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >     @Override
> >     public long extractTimestamp(Trip trip) {
> >         return trip.endTime.getTime();
> >     }
> > });
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> >         Time.days(1));
> > ```
> >
> > `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> > ```java
> > DataStream<Trip> trips = env.addSource(consumer);
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<Trip> featurizedUserTrips =
> >         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >     @Override
> >     public long extractTimestamp(Trip trip) {
> >         return trip.endTime.getTime();
> >     }
> > });
> > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> >         Time.days(1));
> > ```
> >
> > It feels a bug to me, but I want to confirm it before I file the bug
> > report.
> >
> > On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
> > > Hi,
> > >
> > > Could you check the watermark of the window operator? One possible
> > situation would be some of the keys are not getting enough inputs, so their
> > watermarks remain below the window end time and hold the window operator
> > watermark back. IMO, it’s a good practice to assign watermark earlier in
> > the data pipeline.
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > > >
> > > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > ```java
> > > > DataStream<Trip> trips =
> > > >        env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > >            @Override
> > > >            public long extractTimestamp(Trip trip) {
> > > >                return trip.endTime.getTime();
> > > >            }
> > > >        });
> > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new
> > Featurization());
> > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >                Time.days(1));
> > > > ```
> > > >
> > > > But not after `keyBy` and `process`:
> > > > ```java
> > > > DataStream<Trip> trips = env.addSource(consumer);
> > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream<FeaturizedTrip> featurizedUserTrips =
> > > >        userTrips.process(new
> > Featurization()).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > > >            @Override
> > > >            public long extractTimestamp(FeaturizedTrip trip) {
> > > >                return trip.endTime.getTime();
> > > >            }
> > > >        });
> > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >                Time.days(1));
> > > > ```
> > > > Windows are never triggered.
> > > >
> > > > Is it a bug or expected behavior? If the latter, where is it
> > documented?
> > > >
> > >
> > >
> >
> 

Reply via email to