Hi Vino, Kindly check below flink code.
package org.apache.flink.streaming.api.datastream.WindowedStream @PublicEvolving public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) { checkNotNull(function, "function"); if (*function instanceof RichFunction*) { throw new *UnsupportedOperationException("This aggregation function cannot be a RichFunction.")*; } TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType( function, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType( function, input.getType(), null, false); return aggregate(function, accumulatorType, resultType); } Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote: > Hi Gaurav, > > This is very strange, can you share your code and specific exceptions? > Under normal circumstances, it should not throw an exception. > > Thanks, vino. > > Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道: > >> Hi Vino, >> >> RichAggregateFunction can surely access the state. But the problem is, In >> aggregate() method we can not use RichAggregateFunction. >> If we use then it throws exception. >> >> So, the option is to use AggregateFunction (not Rich) with aggregate() >> method on windowed stream. Now, In AggregateFunction, we cannot access >> RuntimeContext. Hence we can not use state. >> >> Thanks & Regards >> Gaurav >> >> >> >> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com> wrote: >> >>> Hi Gaurav, >>> >>> Why do you think the RichAggregateFunction cannot access the State API? >>> RichAggregateFunction inherits from AbstractRichFunction (it provides a >>> RuntimeContext that allows you to access the state API). >>> >>> Thanks, vino. >>> >>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道: >>> >>>> Hi, >>>> >>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>> aggregate() method upon windowed stream. So, To access the state in your >>>> customAggregateFunction, you can implement it using a ProcessFuntion. >>>> This issue is faced by many developers. >>>> So, someone must have implemented or tried to implement it. So, kindly >>>> share >>>> your feedback on this. >>>> As I need to implement this. >>>> >>>> Thanks & Regards >>>> Gaurav Luthra >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>