Re: Regarding implementation of aggregate function using a ProcessFunction

2018-10-02 Thread Gaurav Luthra
Hi Fabian, Thanks for explaining in detail. But we know and you also mentioned the issues in 1) and 2). So, I am continuing with point 3). Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Mon, Oct 1, 2018 at 3:11 PM Fabian Hueske wrote: > Hi, > > There are basically three options: > 1)

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-10-01 Thread Fabian Hueske
Hi, There are basically three options: 1) Use an AggregateFunction and store everything that you would put into state into the Accumulator. This can become quite expensive because the Accumulator is de/serialized for every function call if you use RocksDB. The advantage is that you don't have to

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-30 Thread Gaurav Luthra
Hi ken, Mine is very generic use case. Means I am building an aggregation function using flink, which can be configured according to any use case. Actually, It will not be for a specific use case and every user can enter their business logic and use this aggregator to get result. And about

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-29 Thread Ken Krugler
Hi Gaurav, I’m curious - for your use case, what are the windowing & aggregation requirements? E.g. is it a 10 second sliding window? And what’s the aggregation you’re trying to do? Thanks, — Ken > On Sep 28, 2018, at 4:00 AM, Gaurav Luthra wrote: > > Hi Chesnay, > > I know it is an

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread Gaurav Luthra
Hi Chesnay, I know it is an issue, And won't be fixed because of window merging feature in case of session window. But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method. I hope you got my

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread Chesnay Schepler
Please see: https://issues.apache.org/jira/browse/FLINK-10250 On 28.09.2018 11:27, vino yang wrote: Hi Gaurav, Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer. Thanks, vino. Gaurav Luthra

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread vino yang
Hi Gaurav, Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer. Thanks, vino. Gaurav Luthra 于2018年9月28日周五 下午4:27写道: > Hi Vino, > > Kindly check below flink code. > > package

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread Gaurav Luthra
Hi Vino, Kindly check below flink code. package org.apache.flink.streaming.api.datastream.WindowedStream @PublicEvolving public SingleOutputStreamOperator aggregate(AggregateFunction function) { checkNotNull(function, "function"); if (*function instanceof RichFunction*) { throw new

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread vino yang
Hi Gaurav, This is very strange, can you share your code and specific exceptions? Under normal circumstances, it should not throw an exception. Thanks, vino. Gaurav Luthra 于2018年9月28日周五 下午3:27写道: > Hi Vino, > > RichAggregateFunction can surely access the state. But the problem is, In >

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread vino yang
Hi Gaurav, Why do you think the RichAggregateFunction cannot access the State API? RichAggregateFunction inherits from AbstractRichFunction (it provides a RuntimeContext that allows you to access the state API). Thanks, vino. Gaurav Luthra 于2018年9月28日周五 下午1:38写道: > Hi, > > As we are aware,

Regarding implementation of aggregate function using a ProcessFunction

2018-09-27 Thread Gaurav Luthra
Hi, As we are aware, Currently we cannot use RichAggregateFunction in aggregate() method upon windowed stream. So, To access the state in your customAggregateFunction, you can implement it using a ProcessFuntion. This issue is faced by many developers. So, someone must have implemented or tried