Re: Issue with counter metrics for large number of keys
Hi Ken, Thanks for your inputs again. I will wait for Flink guys to come back to me for the suggestion of implementation of 100 K unique counters. For time being, I will make the number of counter metric value a configurable parameter in my application. So, user will know what he is trying to do. And will restrict the maximum value to 1000 so that no mishap happens about memory. and will tune this max value with memories of JobManager and my application. And try to explore other solutions in flink. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Thu, Jan 17, 2019 at 9:40 AM Ken Krugler wrote: > Hi Gaurav, > > I’ve use a few hundred counters before without problems. My concern about > > 100K unique counters is that you wind up generating load (and maybe > memory issues) for the JobManager. > > E.g. with Hadoop’s metric system trying to go much beyond 1000 counters > could cause significant problems. IIRC it was due to the JobTracker getting > bogged down processing too many counter updates, and/or running out of > memory. It’s possible more recent versions of Hadoop no longer have that > problem. > > But someone on the Flink dev team should weigh in here… > > — Ken > > > On Jan 16, 2019, at 7:45 PM, Gaurav Luthra > wrote: > > Thanks a lot Ken for your inputs. > > I will look for your suggested solution and will update about this. > Moreover I want to know, what is the approx number of counter metrics for > which I should keep the reference of? > Or what is the max number of references of counter metrics you have heard > from anyone using metrics? > > Thanks & Regards > Gaurav Luthra > Mob:- +91-9901945206 > > > On Thu, Jan 17, 2019 at 9:04 AM Ken Krugler > wrote: > >> I think trying to use counters to track counts for 100K+ keys is not >> going to be a good idea. >> >> An alternative is to have a small function with managed MapState >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>, >> and make that state queryable >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#queryable-state-beta> >> >> Though maybe under the hood that’s what metrics is doing anyway :) >> >> — Ken >> >> >> On Jan 16, 2019, at 7:25 PM, Gaurav Luthra >> wrote: >> >> I want new counter for every key of my windowed stream, And I want the >> same counter to get increment when the same key comes multiple times in >> incoming event. >> >> So, I will write below code for every incoming event. >> getRuntimeContext().getMetricGroup().counter(myKey).inc(); >> >> But above code fails when same value of myKey comes. As this is >> limitation of flink metrics. It throws exception, "Name collision: Adding a >> metric with the same name as a metric subgroup: " >> >> Note: myKey is of String type and can have different value for every >> incoming event. >> In my case I expect 100 thousands different values of myKey. >> >> Now, To solve the issue, I have to keep the reference of 100 thousands >> values of myKey in some data structure e.g. >> Map myMetricMap; >> and for every myKey I have to do below. >> >> Counter counter = myMetricMap.get(myKey); >> if (null == windowMetricGauge) >> { >> Counter counter = new counter(); >> counter.inc(); >> myMetricMap.put(myKey, counter); >> getRuntimeContext().getMetricGroup().counter(myKey,counter); >> } >> else >> { >> counter.inc(); >> } >> Above code suffice my purpose. But I do not want to maintain map of 100 >> thousands keys values of myKey. >> >> Is there any alternate solution? I am looking for a solution where I >> achieve above functionality to maintain approx. 100 thousands counter >> metrics without keeping their reference in map (or any other data >> structure). >> >> >> Thanks & Regards >> Gaurav Luthra >> Mob:- +91-9901945206 >> >> >> -- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> > -- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >
Re: Counter Metrics not getting removed from Flink GUI after close()
Hi Chesnay, I do not want to store metric counter in reference variable because I want to create metric counter for every key of keyed stream. There can be n number of keys and I do not want to have n number of references. On Tue, 8 Jan, 2019, 11:01 PM Chesnay Schepler What you're trying to do is not possible. Even if you close the group *it > still exists*, and is returned by subsequent calls to addGroup("mygroup"). > However since it is closed all registration calls will be ignored, hence > why the value isn't updating. > > You can only update a metric by storing a reference to it in your function. > Why do you want to avoid the member variable? > > On 08.01.2019 17:24, Gaurav Luthra wrote: > > Hi Chesnay, > > If removing the metrics is not possible from Flink GUI, while the job is > running. > Then kindly tell me how to update a metric counter. > > Explaination: > Suppose I created a metric Counter with key "chesnay" and incremented the > counter to 20, by code mentioned below. > getRuntimeContext().getMetricGroup().counter("chesnay").inc(20); > > /Note: I am not assigning this counter to any local/member variable as I do > not want to keep state in my job./ > > Now, after some time, If I want to update the value of "chesnay" metric > counter to 60 and I am not aware about the old state (which is 20). > > So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60); > > Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a > WARN log something like this, "same name can be used, and behavior is > undefined". > > Now, how to update the "chesnay" metric Group if I do not want to keep the > state in my Job??? > > Thats why, I though of creating user scoped metric group and thought of > closing that group to remove the metric counters and create new metrics > every time, when I want to update it. > > Hope you understood my problem. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > >
Re: Counter Metrics not getting removed from Flink GUI after close()
Hi Chesnay, If removing the metrics is not possible from Flink GUI, while the job is running. Then kindly tell me how to update a metric counter. Explaination: Suppose I created a metric Counter with key "chesnay" and incremented the counter to 20, by code mentioned below. getRuntimeContext().getMetricGroup().counter("chesnay").inc(20); /Note: I am not assigning this counter to any local/member variable as I do not want to keep state in my job./ Now, after some time, If I want to update the value of "chesnay" metric counter to 60 and I am not aware about the old state (which is 20). So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60); Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a WARN log something like this, "same name can be used, and behavior is undefined". Now, how to update the "chesnay" metric Group if I do not want to keep the state in my Job??? Thats why, I though of creating user scoped metric group and thought of closing that group to remove the metric counters and create new metrics every time, when I want to update it. Hope you understood my problem. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Counter Metrics not getting removed from Flink GUI after close()
Hi, I am using ProcessWindowFunction, and in process() function I am adding user scoped Group as mentioned below. MetricGroup myMetricGroup = getRuntimeContext().getMetricGroup().addGroup( "myGroup") Now, I am creating counter metrics using my myMetricGroup, and I am able to see these counters in Flink GUI. But when I call close() like mentioned below. ((AbstractMetricGroup) myMetricGroup).close(); Even then my counter metrics are not getting removed from flink GUI. Kindly Guide how to close user scoped metric group (myMetricGroup in my case) so that all the counter metrics created using myMetricGroup shall be removed from Flink GUI. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206
Getting "ProducerFenced" exception while using flink kafka producer
Hi, I have two flink jobs, both the jobs are using Flink Kafka Producer and Flink Kafka Consumer running in Exactly-Once mode. Parallelism of both the jobs is one. Both the jobs are same in number of operators and type of operators. When we start one job then that job runs fine. But as soon as we start the second job then both the jobs start failing with "ProducerFenced" Exception at runtime. As per our understanding we think that both the jobs get the same value of transactional ID (assigned by flink), that is required to run the job in Exactly-Once mode. I think, Flink calculates the transactional ID with concatenation of operator names and subtask ID. For our case in both the jobs we have same operators, and both jobs runs with parallelism of one, so both jobs get subtask ID. Question: Kindly provide the solution for this exception, Kindly show some light on our understanding the reason of this exception. And kindly tell how can we get different transactional ID for two jobs. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206
Need the way to create custom metrics
Hi, I need to know the way to implement custom metrics in my flink program. Currently, I know we can create custom metrics with the help of RuntimeContext. But in my aggregate() I do not have RuntimeContext. I am using window operator and applying aggregate() method on it. And I am passing AggregateFunction in aggregate() method. So, Kindly guide me, how can I create custom metrics in my code? Note: As we know we can not user RichAggregateFunction with aggregate() method Thanks & Regards Gaurav Luthra Mob:- +91-9901945206
Re: Partitioning by composite key, But type and number of keys are dynamic
Hi Chesnay, My End user will be aware about the fields of "input records" (GenericRecord). In configuration my end user only will tell me the name and number of the fields, based on these fields of GenericRecord I will have to partition the DataStream and make Keyed Stream. Currently, I have implemented my solution using KeySelector() function. And I have converted all the fields mentioned by user into strings, concatenated these strings (for all the fields for which user want keyBy() means user wants partitioning) and returned single string from KeySelector() function. And partitioning will be happening based on this concatenated string. See the example below. dataStream.keyBy(record -> { return record.get("area").toString()+record.get("age").toString(); }); But, I am looking for better solution. As I do not want to convert every field to string. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Partitioning by composite key, But type and number of keys are dynamic
There is a data stream of some records, Lets call them "input records". Now, I want to partition this data stream by using keyBy(). I want partitioning based on one or more fields of "input record", But the number and type of fields are not fixed. So, Kindly tell me how should I achieve this partitioning based on "input records" mentioned above? Note: Technically, I am using Avro's GenericRecord as "input records". Means I am using DataStream, which needs to be partitioned. And its schema can be different for different jobs. So, I do not know the field names and types to be provided in keyBy(). -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Regarding implementation of aggregate function using a ProcessFunction
Hi Fabian, Thanks for explaining in detail. But we know and you also mentioned the issues in 1) and 2). So, I am continuing with point 3). Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Mon, Oct 1, 2018 at 3:11 PM Fabian Hueske wrote: > Hi, > > There are basically three options: > 1) Use an AggregateFunction and store everything that you would put into > state into the Accumulator. This can become quite expensive because the > Accumulator is de/serialized for every function call if you use RocksDB. > The advantage is that you don't have to store all records in state but only > the data you need. Simple aggregations like COUNT or SUM are quite cheap. > 2) Use Flink's window primitiives and a WindowProcessFunction. In this > case, all records of a Window are stored in a ListState. Adding a record to > the LIst is cheap, but the state might grow quite large for longer windows. > When the window is evaluated, all records are loaded into memory and > iterated by the WindowProcessFunction. > 3) Implement the windowing logic in a ProcessFunction. This requires a lot > of additional logic, depending on what types of windows you want to support. > > Flink's SQL / Table API implements the first approach. > > Best, Fabian > > Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra < > gauravluthra6...@gmail.com>: > >> Hi ken, >> >> Mine is very generic use case. Means I am building an aggregation >> function using flink, which can be configured according to any use case. >> Actually, It will not be for a specific use case and every user can enter >> their business logic and use this aggregator to get result. >> And about windowing also, user can configure the type of window and my >> aggregator will ask about the required properties for that window. >> >> I hope you got some idea. >> >> But for make it generic I need to use processfunction and process() >> method to implement it. Instead of more specific AggregateFunction and >> aggregate() method. >> >> So, I am looking for inputs if anyone has tried implementing aggregation >> using ProcessFunction and process() function. As it very much needed thing >> with flink. >> >> Thanks and Regards, >> Gaurav Luthra >> Mob:- +91-9901945206 >> >> >> On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler >> wrote: >> >>> Hi Gaurav, >>> >>> I’m curious - for your use case, what are the windowing & aggregation >>> requirements? >>> >>> E.g. is it a 10 second sliding window? >>> >>> And what’s the aggregation you’re trying to do? >>> >>> Thanks, >>> >>> — Ken >>> >>> >>> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra >>> wrote: >>> >>> Hi Chesnay, >>> >>> I know it is an issue, And won't be fixed because of window merging >>> feature in case of session window. >>> But I am looking if someone has implemented aggregation function using >>> ProcessFunction and process() method instead of AggregationFunction and >>> aggregate() method. >>> I hope you got my point. >>> >>> Thanks & Regards >>> Gaurav Luthra >>> >>> >>> >>> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler >>> wrote: >>> >>>> Please see: https://issues.apache.org/jira/browse/FLINK-10250 >>>> >>>> On 28.09.2018 11:27, vino yang wrote: >>>> >>>> Hi Gaurav, >>>> >>>> Yes, you are right. It is really not allowed to use RichFunction. I >>>> will Ping Timo, he may give you a more professional answer. >>>> >>>> Thanks, vino. >>>> >>>> Gaurav Luthra 于2018年9月28日周五 下午4:27写道: >>>> >>>>> Hi Vino, >>>>> >>>>> Kindly check below flink code. >>>>> >>>>> package org.apache.flink.streaming.api.datastream.WindowedStream >>>>> >>>>> @PublicEvolving >>>>> public SingleOutputStreamOperator >>>>> aggregate(AggregateFunction function) { >>>>> checkNotNull(function, "function"); >>>>> >>>>> if (*function instanceof RichFunction*) { >>>>> throw new *UnsupportedOperationException("This aggregation function >>>>> cannot be a RichFunction.")*; >>>>> } >>>>> >>>>> TypeInformation accumulatorType = >>>>> TypeExtractor.getAggregateFunctionAccumulatorType( >
Re: Regarding implementation of aggregate function using a ProcessFunction
Hi ken, Mine is very generic use case. Means I am building an aggregation function using flink, which can be configured according to any use case. Actually, It will not be for a specific use case and every user can enter their business logic and use this aggregator to get result. And about windowing also, user can configure the type of window and my aggregator will ask about the required properties for that window. I hope you got some idea. But for make it generic I need to use processfunction and process() method to implement it. Instead of more specific AggregateFunction and aggregate() method. So, I am looking for inputs if anyone has tried implementing aggregation using ProcessFunction and process() function. As it very much needed thing with flink. Thanks and Regards, Gaurav Luthra Mob:- +91-9901945206 On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler wrote: > Hi Gaurav, > > I’m curious - for your use case, what are the windowing & aggregation > requirements? > > E.g. is it a 10 second sliding window? > > And what’s the aggregation you’re trying to do? > > Thanks, > > — Ken > > > On Sep 28, 2018, at 4:00 AM, Gaurav Luthra > wrote: > > Hi Chesnay, > > I know it is an issue, And won't be fixed because of window merging > feature in case of session window. > But I am looking if someone has implemented aggregation function using > ProcessFunction and process() method instead of AggregationFunction and > aggregate() method. > I hope you got my point. > > Thanks & Regards > Gaurav Luthra > > > > On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler > wrote: > >> Please see: https://issues.apache.org/jira/browse/FLINK-10250 >> >> On 28.09.2018 11:27, vino yang wrote: >> >> Hi Gaurav, >> >> Yes, you are right. It is really not allowed to use RichFunction. I will >> Ping Timo, he may give you a more professional answer. >> >> Thanks, vino. >> >> Gaurav Luthra 于2018年9月28日周五 下午4:27写道: >> >>> Hi Vino, >>> >>> Kindly check below flink code. >>> >>> package org.apache.flink.streaming.api.datastream.WindowedStream >>> >>> @PublicEvolving >>> public SingleOutputStreamOperator >>> aggregate(AggregateFunction function) { >>> checkNotNull(function, "function"); >>> >>> if (*function instanceof RichFunction*) { >>> throw new *UnsupportedOperationException("This aggregation function >>> cannot be a RichFunction.")*; >>> } >>> >>> TypeInformation accumulatorType = >>> TypeExtractor.getAggregateFunctionAccumulatorType( >>> function, input.getType(), null, false); >>> >>> TypeInformation resultType = >>> TypeExtractor.getAggregateFunctionReturnType( >>> function, input.getType(), null, false); >>> >>> return aggregate(function, accumulatorType, resultType); >>> } >>> >>> >>> Kindly, check above snapshot of flink;s aggregate() method, that got >>> applied on windowed stream. >>> >>> Thanks & Regards >>> Gaurav Luthra >>> Mob:- +91-9901945206 >>> >>> >>> On Fri, Sep 28, 2018 at 1:40 PM vino yang wrote: >>> >>>> Hi Gaurav, >>>> >>>> This is very strange, can you share your code and specific exceptions? >>>> Under normal circumstances, it should not throw an exception. >>>> >>>> Thanks, vino. >>>> >>>> Gaurav Luthra 于2018年9月28日周五 下午3:27写道: >>>> >>>>> Hi Vino, >>>>> >>>>> RichAggregateFunction can surely access the state. But the problem is, >>>>> In aggregate() method we can not use RichAggregateFunction. >>>>> If we use then it throws exception. >>>>> >>>>> So, the option is to use AggregateFunction (not Rich) with aggregate() >>>>> method on windowed stream. Now, In AggregateFunction, we cannot access >>>>> RuntimeContext. Hence we can not use state. >>>>> >>>>> Thanks & Regards >>>>> Gaurav >>>>> >>>>> >>>>> >>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, >>>>> wrote: >>>>> >>>>>> Hi Gaurav, >>>>>> >>>>>> Why do you think the RichAggregateFunction cannot access the State >>>>>> API? >>>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides >>>>>> a RuntimeContext that allows you to access the state API). >>>>>> >>>>>> Thanks, vino. >>>>>> >>>>>> Gaurav Luthra 于2018年9月28日周五 下午1:38写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>>>>> aggregate() method upon windowed stream. So, To access the state in >>>>>>> your >>>>>>> customAggregateFunction, you can implement it using a ProcessFuntion. >>>>>>> This issue is faced by many developers. >>>>>>> So, someone must have implemented or tried to implement it. So, >>>>>>> kindly share >>>>>>> your feedback on this. >>>>>>> As I need to implement this. >>>>>>> >>>>>>> Thanks & Regards >>>>>>> Gaurav Luthra >>>>>>> >>>>>> > -- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >
Re: Regarding implementation of aggregate function using a ProcessFunction
Hi Chesnay, I know it is an issue, And won't be fixed because of window merging feature in case of session window. But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method. I hope you got my point. Thanks & Regards Gaurav Luthra On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler wrote: > Please see: https://issues.apache.org/jira/browse/FLINK-10250 > > On 28.09.2018 11:27, vino yang wrote: > > Hi Gaurav, > > Yes, you are right. It is really not allowed to use RichFunction. I will > Ping Timo, he may give you a more professional answer. > > Thanks, vino. > > Gaurav Luthra 于2018年9月28日周五 下午4:27写道: > >> Hi Vino, >> >> Kindly check below flink code. >> >> package org.apache.flink.streaming.api.datastream.WindowedStream >> >> @PublicEvolving >> public SingleOutputStreamOperator >> aggregate(AggregateFunction function) { >> checkNotNull(function, "function"); >> >> if (*function instanceof RichFunction*) { >> throw new *UnsupportedOperationException("This aggregation function >> cannot be a RichFunction.")*; >> } >> >> TypeInformation accumulatorType = >> TypeExtractor.getAggregateFunctionAccumulatorType( >> function, input.getType(), null, false); >> >> TypeInformation resultType = >> TypeExtractor.getAggregateFunctionReturnType( >> function, input.getType(), null, false); >> >> return aggregate(function, accumulatorType, resultType); >> } >> >> >> Kindly, check above snapshot of flink;s aggregate() method, that got >> applied on windowed stream. >> >> Thanks & Regards >> Gaurav Luthra >> Mob:- +91-9901945206 >> >> >> On Fri, Sep 28, 2018 at 1:40 PM vino yang wrote: >> >>> Hi Gaurav, >>> >>> This is very strange, can you share your code and specific exceptions? >>> Under normal circumstances, it should not throw an exception. >>> >>> Thanks, vino. >>> >>> Gaurav Luthra 于2018年9月28日周五 下午3:27写道: >>> >>>> Hi Vino, >>>> >>>> RichAggregateFunction can surely access the state. But the problem is, >>>> In aggregate() method we can not use RichAggregateFunction. >>>> If we use then it throws exception. >>>> >>>> So, the option is to use AggregateFunction (not Rich) with aggregate() >>>> method on windowed stream. Now, In AggregateFunction, we cannot access >>>> RuntimeContext. Hence we can not use state. >>>> >>>> Thanks & Regards >>>> Gaurav >>>> >>>> >>>> >>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, >>>> wrote: >>>> >>>>> Hi Gaurav, >>>>> >>>>> Why do you think the RichAggregateFunction cannot access the State >>>>> API? >>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides >>>>> a RuntimeContext that allows you to access the state API). >>>>> >>>>> Thanks, vino. >>>>> >>>>> Gaurav Luthra 于2018年9月28日周五 下午1:38写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>>>> aggregate() method upon windowed stream. So, To access the state in >>>>>> your >>>>>> customAggregateFunction, you can implement it using a ProcessFuntion. >>>>>> This issue is faced by many developers. >>>>>> So, someone must have implemented or tried to implement it. So, >>>>>> kindly share >>>>>> your feedback on this. >>>>>> As I need to implement this. >>>>>> >>>>>> Thanks & Regards >>>>>> Gaurav Luthra >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>> >
Re: Regarding implementation of aggregate function using a ProcessFunction
Hi Vino, Kindly check below flink code. package org.apache.flink.streaming.api.datastream.WindowedStream @PublicEvolving public SingleOutputStreamOperator aggregate(AggregateFunction function) { checkNotNull(function, "function"); if (*function instanceof RichFunction*) { throw new *UnsupportedOperationException("This aggregation function cannot be a RichFunction.")*; } TypeInformation accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType( function, input.getType(), null, false); TypeInformation resultType = TypeExtractor.getAggregateFunctionReturnType( function, input.getType(), null, false); return aggregate(function, accumulatorType, resultType); } Kindly, check above snapshot of flink;s aggregate() method, that got applied on windowed stream. Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Fri, Sep 28, 2018 at 1:40 PM vino yang wrote: > Hi Gaurav, > > This is very strange, can you share your code and specific exceptions? > Under normal circumstances, it should not throw an exception. > > Thanks, vino. > > Gaurav Luthra 于2018年9月28日周五 下午3:27写道: > >> Hi Vino, >> >> RichAggregateFunction can surely access the state. But the problem is, In >> aggregate() method we can not use RichAggregateFunction. >> If we use then it throws exception. >> >> So, the option is to use AggregateFunction (not Rich) with aggregate() >> method on windowed stream. Now, In AggregateFunction, we cannot access >> RuntimeContext. Hence we can not use state. >> >> Thanks & Regards >> Gaurav >> >> >> >> On Fri, 28 Sep, 2018, 12:40 PM vino yang, wrote: >> >>> Hi Gaurav, >>> >>> Why do you think the RichAggregateFunction cannot access the State API? >>> RichAggregateFunction inherits from AbstractRichFunction (it provides a >>> RuntimeContext that allows you to access the state API). >>> >>> Thanks, vino. >>> >>> Gaurav Luthra 于2018年9月28日周五 下午1:38写道: >>> >>>> Hi, >>>> >>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>> aggregate() method upon windowed stream. So, To access the state in your >>>> customAggregateFunction, you can implement it using a ProcessFuntion. >>>> This issue is faced by many developers. >>>> So, someone must have implemented or tried to implement it. So, kindly >>>> share >>>> your feedback on this. >>>> As I need to implement this. >>>> >>>> Thanks & Regards >>>> Gaurav Luthra >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>
Regarding implementation of aggregate function using a ProcessFunction
Hi, As we are aware, Currently we cannot use RichAggregateFunction in aggregate() method upon windowed stream. So, To access the state in your customAggregateFunction, you can implement it using a ProcessFuntion. This issue is faced by many developers. So, someone must have implemented or tried to implement it. So, kindly share your feedback on this. As I need to implement this. Thanks & Regards Gaurav Luthra -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/