Hi ken,

Mine is very generic use case. Means I am building an aggregation function
using flink, which can be configured according to any use case.
Actually, It will not be for a specific use case and every user can enter
their business logic and use this aggregator to get result.
And about windowing also, user can configure the type of window and my
aggregator will ask about the required properties for that window.

I hope you got some idea.

But for make it generic I need to use processfunction and process() method
to implement it. Instead of more specific AggregateFunction and aggregate()
method.

So, I am looking for inputs if anyone has tried implementing aggregation
using ProcessFunction and process() function. As it very much needed thing
with flink.

Thanks and Regards,
Gaurav Luthra
Mob:- +91-9901945206


On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Gaurav,
>
> I’m curious - for your use case, what are the windowing & aggregation
> requirements?
>
> E.g. is it a 10 second sliding window?
>
> And what’s the aggregation you’re trying to do?
>
> Thanks,
>
> — Ken
>
>
> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <gauravluthra6...@gmail.com>
> wrote:
>
> Hi Chesnay,
>
> I know it is an issue, And won't be fixed because of window merging
> feature in case of session window.
> But I am looking if someone has implemented aggregation function using
> ProcessFunction and process() method instead of AggregationFunction and
> aggregate() method.
> I hope you got my point.
>
> Thanks & Regards
> Gaurav Luthra
>
>
>
> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>>
>> On 28.09.2018 11:27, vino yang wrote:
>>
>> Hi Gaurav,
>>
>> Yes, you are right. It is really not allowed to use RichFunction. I will
>> Ping Timo, he may give you a more professional answer.
>>
>> Thanks, vino.
>>
>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道:
>>
>>> Hi Vino,
>>>
>>> Kindly check below flink code.
>>>
>>> package org.apache.flink.streaming.api.datastream.WindowedStream
>>>
>>> @PublicEvolving
>>> public <ACC, R> SingleOutputStreamOperator<R>
>>> aggregate(AggregateFunction<T, ACC, R> function) {
>>> checkNotNull(function, "function");
>>>
>>> if (*function instanceof RichFunction*) {
>>> throw new *UnsupportedOperationException("This aggregation function
>>> cannot be a RichFunction.")*;
>>> }
>>>
>>> TypeInformation<ACC> accumulatorType =
>>> TypeExtractor.getAggregateFunctionAccumulatorType(
>>> function, input.getType(), null, false);
>>>
>>> TypeInformation<R> resultType =
>>> TypeExtractor.getAggregateFunctionReturnType(
>>> function, input.getType(), null, false);
>>>
>>> return aggregate(function, accumulatorType, resultType);
>>> }
>>>
>>>
>>> Kindly, check above snapshot of flink;s aggregate() method, that got
>>> applied on windowed stream.
>>>
>>> Thanks & Regards
>>> Gaurav Luthra
>>> Mob:- +91-9901945206
>>>
>>>
>>> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote:
>>>
>>>> Hi Gaurav,
>>>>
>>>> This is very strange, can you share your code and specific exceptions?
>>>> Under normal circumstances, it should not throw an exception.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道:
>>>>
>>>>> Hi Vino,
>>>>>
>>>>> RichAggregateFunction can surely access the state. But the problem is,
>>>>> In aggregate() method we can not use RichAggregateFunction.
>>>>> If we use then it throws exception.
>>>>>
>>>>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>>>>> method on windowed stream. Now, In AggregateFunction, we cannot access
>>>>> RuntimeContext. Hence we can not use state.
>>>>>
>>>>> Thanks & Regards
>>>>> Gaurav
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Gaurav,
>>>>>>
>>>>>> Why do you think the RichAggregateFunction cannot access the State
>>>>>> API?
>>>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides
>>>>>> a RuntimeContext that allows you to access the state API).
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>>>>> aggregate() method upon windowed stream. So, To access the state in
>>>>>>> your
>>>>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>>>>> This issue is faced by many developers.
>>>>>>> So, someone must have implemented or tried to implement it. So,
>>>>>>> kindly share
>>>>>>> your feedback on this.
>>>>>>> As I need to implement this.
>>>>>>>
>>>>>>> Thanks & Regards
>>>>>>> Gaurav Luthra
>>>>>>>
>>>>>>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to