Hi Fabian & Stefan, Thanks, and yes that does work more like what I’d expect.
Regards, — Ken PS - Just FYI the Java code examples in the documentation referenced below have a number of bugs, see FLINK-9299 <https://issues.apache.org/jira/browse/FLINK-9299>. > On May 4, 2018, at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Ken, > > You can also use an additional ProcessWindowFunction [1] that is applied on > the result of the AggregateFunction to set the key. > Since the PWF is only applied on the final result, there no overhead > (actually, it might even be slightly cheaper because the AggregateFunction > can be simpler). > > If you don't want to use a PWF, your approach is the right one. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation > > <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation> > > 2018-05-03 19:53 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>>: > Hi list, > > I was trying different ways to implement a moving average (count based, not > time based). > > The blunt instrument approach is to create a custom FlatMapFunction that > keeps track of the last N values. > > It seemed like using an AggregateFunction would be most consistent with the > Flink API, along the lines of... > > .keyBy(new MyKeySelector()) > .window(GlobalWindows.create()) > .trigger(CountTrigger.of(1)) > .aggregate(new MovingAverageAggregator(10)) > > This works, but the API for the AggregateFunction (MovingAverageAggregator) > feels a bit odd. > > Specifically, I want to emit a <key, moving average> result from getResult(), > but the key isn’t passed to the createAccumulator() method, nor is it passed > to the getResult() method. So in the add() method I check if the accumulator > I’ve created has a key set, and if not then I extract the key from the record > and set it on the accumulator, so I can use it in the getResult() call. > > Is this expected, or am I miss-using the functionality? > > Thanks, > > — Ken -------------------------------------------- http://about.me/kkrugler +1 530-210-6378