Hi Fabian & Stefan,

Thanks, and yes that does work more like what I’d expect.

Regards,

— Ken

PS - Just FYI the Java code examples in the documentation referenced below have 
a number of bugs, see FLINK-9299 
<https://issues.apache.org/jira/browse/FLINK-9299>.


> On May 4, 2018, at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ken,
> 
> You can also use an additional ProcessWindowFunction [1] that is applied on 
> the result of the AggregateFunction to set the key.
> Since the PWF is only applied on the final result, there no overhead 
> (actually, it might even be slightly cheaper because the AggregateFunction 
> can be simpler).
> 
> If you don't want to use a PWF, your approach is the right one.
> 
> Best, Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation>
> 
> 2018-05-03 19:53 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>>:
> Hi list,
> 
> I was trying different ways to implement a moving average (count based, not 
> time based).
> 
> The blunt instrument approach is to create a custom FlatMapFunction that 
> keeps track of the last N values.
> 
> It seemed like using an AggregateFunction would be most consistent with the 
> Flink API, along the lines of...
> 
>             .keyBy(new MyKeySelector())
>             .window(GlobalWindows.create())
>             .trigger(CountTrigger.of(1))
>             .aggregate(new MovingAverageAggregator(10))
> 
> This works, but the API for the AggregateFunction (MovingAverageAggregator) 
> feels a bit odd.
> 
> Specifically, I want to emit a <key, moving average> result from getResult(), 
> but the key isn’t passed to the createAccumulator() method, nor is it passed 
> to the getResult() method. So in the add() method I check if the accumulator 
> I’ve created has a key set, and if not then I extract the key from the record 
> and set it on the accumulator, so I can use it in the getResult() call.
> 
> Is this expected, or am I miss-using the functionality?
> 
> Thanks,
> 
> — Ken

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply via email to