Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC 
should in fact be kept in the state backend. Did you configure the job to use 
RocksDB? How are the memory problems manifesting?

Best,
Aljoscha

> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Vishal,
> 
> you are right, it is not possible to use state in an AggregateFunction 
> because windows need to be mergeable. 
> An AggregateFunction knows how to merge its accumulators but merging generic 
> state is not possible. 
> 
> I am not aware of an efficient and easy work around for this. 
> If you want to use the provided session window logic, you can use a 
> WindowFunction that performs all computations when the window is triggered. 
> This means that aggregations do not happen eagerly and all events for a 
> window are collected and held in state.
> Another approach could be to implement the whole logic (incl. the session 
> windowing) using a ProcessFunction. This would be a major effort though.
> 
> Best,
> Fabian
> 
> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>>:
> It seems that this has to do with session windows tbat are mergeable ? I 
> tried the RixhWindow function and that seems to suggest that one cannot use 
> state ? Any ideas folks...
> 
> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>> wrote:
> I have a simple Aggregation with one caveat. For some reason I have to keep a 
> large amount of state till the window is GCed. The state is within the 
> Accumulator ( ACC ). I am hitting a memory bottleneck and would like to 
> offload the state  to the states backend ( ROCKSDB), keeping the between 
> checkpoint state in memory ( seems to be an obvious fix). I am not though 
> allowed to have a RichAggregateFunction in the aggregate method of a windowed 
> stream . That begs 2 questions  
> 
> 1. Why 
> 2. Is there an alternative for stateful window aggregation where we manage 
> the state. ?
> 
> Thanks Vishal
> 
> 
> Here is the code ( generics but it works  ) 
> SingleOutputStreamOperator<OUT> retVal = input
>         .keyBy(keySelector)
>         .window(EventTimeSessionWindows.withGap(gap))
>         .aggregate(
>                 new AggregateFunction<IN, ACC, OUT>() {
> 
>                     @Override
>                     public ACC createAccumulator() {
>                         ACC newInstance = (ACC) accumulator.clone();
>                         newInstance.resetLocal();
>                         return newInstance;
>                     }
> 
>                     @Override
>                     public void add(IN value, ACC accumulator) {
>                         accumulator.add(value);
> 
>                     }
> 
>                     @Override
>                     public OUT getResult(ACC accumulator) {
>                         return accumulator.getLocalValue();
>                     }
> 
>                     @Override
>                     public ACC merge(ACC a, ACC b) {
>                         a.merge(b);
>                         return a;
>                     }
>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>                     @Override
>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> 
> input, Collector<OUT> out) throws Exception {
>                             out.collect(input.iterator().next());
>                     }
>                 }, accType, aggregationResultType, aggregationResultType);
> 

Reply via email to