Hi David,

Thanks a lot for the explanation!

Eleanore

On Thu, Jul 2, 2020 at 6:30 AM David Anderson <[email protected]> wrote:

> Eleanore,
>
> Yes, if you change the implementation in the way that is suggested by the
> slide, the tests will fail. But it's more interesting to observe the
> behavior in the console.
>
> The notes that go with that slide explain the situation in more detail.
> (Use alt-p or option-p to see the notes). But to recap here, there are two
> related effects:
>
> (1) Instead of producing a single result at the end of the window, this
> alternative implementation produces a result for every event. In other
> words, it produces a stream that eventually arrives at the same maximum
> value produced by the timeWindowAll.
>
> (2) With timeWindowAll, once the results for a given hour have been
> produced, Flink frees the state associated with the window for that hour.
> It knows, based on the watermarking, that no more events are expected, so
> the state is no longer needed and can be cleared. But with maxBy, the state
> for each key (each hour) is kept forever. This is why this is not a good
> approach: the keyspace is unbounded, and we can't intervene to clean up
> stale state.
>
> Regards,
> David
>
> On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin <[email protected]>
> wrote:
>
>> Hi experts,
>>
>> I am going through Ververica flink training, and when doing the lab with
>> window (https://training.ververica.com/exercises/windows), basically it
>> requires to compute within an hour which driver earns the most tip.
>>
>> The logic is to
>> 0. keyBy driverId
>> 1. create 1 hour window based on eventTime
>> 2. sum up all the tips for this driver within this 1 hour window
>> 3. create an 1 hour globalWindow for all drivers
>> 4. find the max tips
>>
>> sample code shown as below.
>>
>> SingleOutputStreamOperator<Tuple3<Long, Long, Float>> 
>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
>>  .window(TumblingEventTimeWindows.of(Time.hours(1)))
>>  .process(new SumTipsFunction());
>>
>> // Tuple3: reporting the timestamp for the end of the hour, the driverId, 
>> and the total of that driver's tips for that hour
>> SingleOutputStreamOperator<Tuple3<Long, Long, Float>> hourlyMax =
>>  
>> aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
>>  .maxBy(2);
>>
>>
>> The question is shown as 4th slide: why we cannot keyed by the hour?
>>
>> If I change the implementation to keyBy hour and run the HourlyTipsTest,
>>
>> the test of testMaxAcrossDrivers will fail:
>>
>> // (946688400000,1,6.0) -> for timestamp window: 946688400000, driverId: 1, 
>> earns most tip: 6.0
>>
>> Expected :[(946688400000,1,6.0), (946692000000,2,20.0)]
>> Actual   :[(946688400000,1,6.0), (946692000000,2,20.0), 
>> (946692000000,2,20.0)]
>>
>>
>> [image: image.png]
>>
>> Thanks a lot!
>> Eleanore
>>
>>

回复