Hi, If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?
Best, Aljoscha > On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Vishal, > > you are right, it is not possible to use state in an AggregateFunction > because windows need to be mergeable. > An AggregateFunction knows how to merge its accumulators but merging generic > state is not possible. > > I am not aware of an efficient and easy work around for this. > If you want to use the provided session window logic, you can use a > WindowFunction that performs all computations when the window is triggered. > This means that aggregations do not happen eagerly and all events for a > window are collected and held in state. > Another approach could be to implement the whole logic (incl. the session > windowing) using a ProcessFunction. This would be a major effort though. > > Best, > Fabian > > 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com > <mailto:vishal.santo...@gmail.com>>: > It seems that this has to do with session windows tbat are mergeable ? I > tried the RixhWindow function and that seems to suggest that one cannot use > state ? Any ideas folks... > > On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com > <mailto:vishal.santo...@gmail.com>> wrote: > I have a simple Aggregation with one caveat. For some reason I have to keep a > large amount of state till the window is GCed. The state is within the > Accumulator ( ACC ). I am hitting a memory bottleneck and would like to > offload the state to the states backend ( ROCKSDB), keeping the between > checkpoint state in memory ( seems to be an obvious fix). I am not though > allowed to have a RichAggregateFunction in the aggregate method of a windowed > stream . That begs 2 questions > > 1. Why > 2. Is there an alternative for stateful window aggregation where we manage > the state. ? > > Thanks Vishal > > > Here is the code ( generics but it works ) > SingleOutputStreamOperator<OUT> retVal = input > .keyBy(keySelector) > .window(EventTimeSessionWindows.withGap(gap)) > .aggregate( > new AggregateFunction<IN, ACC, OUT>() { > > @Override > public ACC createAccumulator() { > ACC newInstance = (ACC) accumulator.clone(); > newInstance.resetLocal(); > return newInstance; > } > > @Override > public void add(IN value, ACC accumulator) { > accumulator.add(value); > > } > > @Override > public OUT getResult(ACC accumulator) { > return accumulator.getLocalValue(); > } > > @Override > public ACC merge(ACC a, ACC b) { > a.merge(b); > return a; > } > }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { > @Override > public void apply(KEY s, TimeWindow window, Iterable<OUT> > input, Collector<OUT> out) throws Exception { > out.collect(input.iterator().next()); > } > }, accType, aggregationResultType, aggregationResultType); >