Hi experts,

I am going through Ververica flink training, and when doing the lab with
window (https://training.ververica.com/exercises/windows), basically it
requires to compute within an hour which driver earns the most tip.

The logic is to
0. keyBy driverId
1. create 1 hour window based on eventTime
2. sum up all the tips for this driver within this 1 hour window
3. create an 1 hour globalWindow for all drivers
4. find the max tips

sample code shown as below.

SingleOutputStreamOperator<Tuple3<Long, Long, Float>>
aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .process(new SumTipsFunction());

// Tuple3: reporting the timestamp for the end of the hour, the
driverId, and the total of that driver's tips for that hour
SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax =
 aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
 .maxBy(2);


The question is shown as 4th slide: why we cannot keyed by the hour?

If I change the implementation to keyBy hour and run the HourlyTipsTest,

the test of testMaxAcrossDrivers will fail:

// (946688400000,1,6.0) -> for timestamp window: 946688400000,
driverId: 1, earns most tip: 6.0

Expected :[(946688400000,1,6.0), (946692000000,2,20.0)]
Actual   :[(946688400000,1,6.0), (946692000000,2,20.0), (946692000000,2,20.0)]


[image: image.png]

Thanks a lot!
Eleanore

回复