Hi David, Thanks a lot for the explanation!
Eleanore On Thu, Jul 2, 2020 at 6:30 AM David Anderson <[email protected]> wrote: > Eleanore, > > Yes, if you change the implementation in the way that is suggested by the > slide, the tests will fail. But it's more interesting to observe the > behavior in the console. > > The notes that go with that slide explain the situation in more detail. > (Use alt-p or option-p to see the notes). But to recap here, there are two > related effects: > > (1) Instead of producing a single result at the end of the window, this > alternative implementation produces a result for every event. In other > words, it produces a stream that eventually arrives at the same maximum > value produced by the timeWindowAll. > > (2) With timeWindowAll, once the results for a given hour have been > produced, Flink frees the state associated with the window for that hour. > It knows, based on the watermarking, that no more events are expected, so > the state is no longer needed and can be cleared. But with maxBy, the state > for each key (each hour) is kept forever. This is why this is not a good > approach: the keyspace is unbounded, and we can't intervene to clean up > stale state. > > Regards, > David > > On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin <[email protected]> > wrote: > >> Hi experts, >> >> I am going through Ververica flink training, and when doing the lab with >> window (https://training.ververica.com/exercises/windows), basically it >> requires to compute within an hour which driver earns the most tip. >> >> The logic is to >> 0. keyBy driverId >> 1. create 1 hour window based on eventTime >> 2. sum up all the tips for this driver within this 1 hour window >> 3. create an 1 hour globalWindow for all drivers >> 4. find the max tips >> >> sample code shown as below. >> >> SingleOutputStreamOperator<Tuple3<Long, Long, Float>> >> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId) >> .window(TumblingEventTimeWindows.of(Time.hours(1))) >> .process(new SumTipsFunction()); >> >> // Tuple3: reporting the timestamp for the end of the hour, the driverId, >> and the total of that driver's tips for that hour >> SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax = >> >> aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1))) >> .maxBy(2); >> >> >> The question is shown as 4th slide: why we cannot keyed by the hour? >> >> If I change the implementation to keyBy hour and run the HourlyTipsTest, >> >> the test of testMaxAcrossDrivers will fail: >> >> // (946688400000,1,6.0) -> for timestamp window: 946688400000, driverId: 1, >> earns most tip: 6.0 >> >> Expected :[(946688400000,1,6.0), (946692000000,2,20.0)] >> Actual :[(946688400000,1,6.0), (946692000000,2,20.0), >> (946692000000,2,20.0)] >> >> >> [image: image.png] >> >> Thanks a lot! >> Eleanore >> >>
