Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R>
aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");

if (*function instanceof RichFunction*) {
throw new *UnsupportedOperationException("This aggregation function cannot
be a RichFunction.")*;
}

TypeInformation<ACC> accumulatorType =
TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation<R> resultType =
TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got
applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote:

> Hi Gaurav,
>
> This is very strange, can you share your code and specific exceptions?
> Under normal circumstances, it should not throw an exception.
>
> Thanks, vino.
>
> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道:
>
>> Hi Vino,
>>
>> RichAggregateFunction can surely access the state. But the problem is, In
>> aggregate() method we can not use RichAggregateFunction.
>> If we use then it throws exception.
>>
>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>> method on windowed stream. Now, In AggregateFunction, we cannot access
>> RuntimeContext. Hence we can not use state.
>>
>> Thanks & Regards
>> Gaurav
>>
>>
>>
>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com> wrote:
>>
>>> Hi Gaurav,
>>>
>>> Why do you think the RichAggregateFunction cannot access the State API?
>>> RichAggregateFunction inherits from AbstractRichFunction (it provides a
>>> RuntimeContext that allows you to access the state API).
>>>
>>> Thanks, vino.
>>>
>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道:
>>>
>>>> Hi,
>>>>
>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>> aggregate() method upon windowed stream. So, To access the state in your
>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>> This issue is faced by many developers.
>>>> So, someone must have implemented or tried to implement it. So, kindly
>>>> share
>>>> your feedback on this.
>>>> As I need to implement this.
>>>>
>>>> Thanks & Regards
>>>> Gaurav Luthra
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to