Re: Issue with counter metrics for large number of keys

2019-01-16 Thread Gaurav Luthra
Hi Ken,

Thanks for your inputs again.
I will wait for Flink guys to come back to me for the suggestion of
implementation of 100 K unique counters.
For time being, I will make the number of counter metric value a
configurable parameter in my application. So, user will know what he is
trying to do.
And will restrict the maximum value to 1000 so that no mishap happens about
memory. and will tune this max value with memories of JobManager and my
application.

And try to explore other solutions in flink.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Thu, Jan 17, 2019 at 9:40 AM Ken Krugler 
wrote:

> Hi Gaurav,
>
> I’ve use a few hundred counters before without problems. My concern about
> > 100K unique counters is that you wind up generating load (and maybe
> memory issues) for the JobManager.
>
> E.g. with Hadoop’s metric system trying to go much beyond 1000 counters
> could cause significant problems. IIRC it was due to the JobTracker getting
> bogged down processing too many counter updates, and/or running out of
> memory. It’s possible more recent versions of Hadoop no longer have that
> problem.
>
> But someone on the Flink dev team should weigh in here…
>
> — Ken
>
>
> On Jan 16, 2019, at 7:45 PM, Gaurav Luthra 
> wrote:
>
> Thanks a lot Ken for your inputs.
>
> I will look for your suggested solution and will update about this.
> Moreover I want to know, what is the approx number of counter metrics for
> which I should keep the reference of?
> Or what is the max number of references of counter metrics you have heard
> from anyone using metrics?
>
> Thanks & Regards
> Gaurav Luthra
> Mob:- +91-9901945206
>
>
> On Thu, Jan 17, 2019 at 9:04 AM Ken Krugler 
> wrote:
>
>> I think trying to use counters to track counts for 100K+ keys is not
>> going to be a good idea.
>>
>> An alternative is to have a small function with managed MapState
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>,
>> and make that state queryable
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#queryable-state-beta>
>>
>> Though maybe under the hood that’s what metrics is doing anyway :)
>>
>> — Ken
>>
>>
>> On Jan 16, 2019, at 7:25 PM, Gaurav Luthra 
>> wrote:
>>
>> I want new counter for every key of my windowed stream, And I want the
>> same counter to get increment when the same key comes multiple times in
>> incoming event.
>>
>> So, I will write below code for every incoming event.
>> getRuntimeContext().getMetricGroup().counter(myKey).inc();
>>
>> But above code fails when same value of myKey comes. As this is
>> limitation of flink metrics. It throws exception, "Name collision: Adding a
>> metric with the same name as a metric subgroup: "
>>
>> Note: myKey is of String type and can have different value for every
>> incoming event.
>> In my case I expect 100 thousands different values of myKey.
>>
>> Now, To solve the issue, I have to keep the reference of 100 thousands
>> values of myKey in some data structure e.g.
>>  Map myMetricMap;
>>  and for every myKey I have to do below.
>>
>> Counter counter = myMetricMap.get(myKey);
>> if (null == windowMetricGauge)
>> {
>> Counter counter = new counter();
>> counter.inc();
>> myMetricMap.put(myKey, counter);
>> getRuntimeContext().getMetricGroup().counter(myKey,counter);
>> }
>> else
>> {
>> counter.inc();
>> }
>> Above code suffice my purpose. But I do not want to maintain map of 100
>> thousands keys values of myKey.
>>
>> Is there any alternate solution? I am looking for a solution where I
>> achieve above functionality to maintain approx. 100 thousands counter
>> metrics without keeping their reference in map (or any other data
>> structure).
>>
>>
>> Thanks & Regards
>> Gaurav Luthra
>> Mob:- +91-9901945206
>>
>>
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Gaurav Luthra
Hi Chesnay,

I do not want to store metric counter in reference variable because I want
to create metric counter for every key of keyed stream.

There can be n number of keys and I do not want to have n number of
references.

On Tue, 8 Jan, 2019, 11:01 PM Chesnay Schepler  What you're trying to do is not possible. Even if you close the group *it
> still exists*, and is returned by subsequent calls to addGroup("mygroup").
> However since it is closed all registration calls will be ignored, hence
> why the value isn't updating.
>
> You can only update a metric by storing a reference to it in your function.
> Why do you want to avoid the member variable?
>
> On 08.01.2019 17:24, Gaurav Luthra wrote:
>
> Hi Chesnay,
>
> If removing the metrics is not possible from Flink GUI, while the job is
> running.
> Then kindly tell me how to update a metric counter.
>
> Explaination:
> Suppose I created a metric Counter with key "chesnay" and incremented the
> counter to 20, by code mentioned below.
> getRuntimeContext().getMetricGroup().counter("chesnay").inc(20);
>
> /Note: I am not assigning this counter to any local/member variable as I do
> not want to keep state in my job./
>
> Now, after some time, If I want to update the value of "chesnay" metric
> counter to 60 and I am not aware about the old state (which is 20).
>
> So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60);
>
> Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a
> WARN log something like this, "same name can be used, and behavior is
> undefined".
>
> Now, how to update the "chesnay" metric Group if I do not want to keep the
> state in my Job???
>
> Thats why, I though of creating user scoped metric group and thought of
> closing that group to remove the metric counters and create new metrics
> every time, when I want to update it.
>
> Hope you understood my problem.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Gaurav Luthra
Hi Chesnay,

If removing the metrics is not possible from Flink GUI, while the job is
running.
Then kindly tell me how to update a metric counter.

Explaination:
Suppose I created a metric Counter with key "chesnay" and incremented the
counter to 20, by code mentioned below.
getRuntimeContext().getMetricGroup().counter("chesnay").inc(20);

/Note: I am not assigning this counter to any local/member variable as I do
not want to keep state in my job./

Now, after some time, If I want to update the value of "chesnay" metric
counter to 60 and I am not aware about the old state (which is 20).

So, If I do getRuntimeContext().getMetricGroup().counter("chesnay").inc(60);

Event then Flink GUI shows value 20 for "chesnay" metric Group. and gives a
WARN log something like this, "same name can be used, and behavior is
undefined".

Now, how to update the "chesnay" metric Group if I do not want to keep the
state in my Job???

Thats why, I though of creating user scoped metric group and thought of
closing that group to remove the metric counters and create new metrics
every time, when I want to update it.

Hope you understood my problem.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Counter Metrics not getting removed from Flink GUI after close()

2019-01-08 Thread Gaurav Luthra
Hi,

I am using ProcessWindowFunction, and in process() function I am adding
user scoped Group as mentioned below.
MetricGroup myMetricGroup = getRuntimeContext().getMetricGroup().addGroup(
"myGroup")

Now, I am creating counter metrics using my myMetricGroup, and I am able to
see these counters in Flink GUI.
But when I call close() like mentioned below.
((AbstractMetricGroup) myMetricGroup).close();

Even then my counter metrics are not getting removed from flink GUI.

Kindly Guide how to close user scoped metric group (myMetricGroup in my
case) so that all the counter metrics created using myMetricGroup shall be
removed from Flink GUI.


Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


Getting "ProducerFenced" exception while using flink kafka producer

2018-12-20 Thread Gaurav Luthra
Hi,

I have two flink jobs, both the jobs are using Flink Kafka Producer and
Flink Kafka Consumer running in Exactly-Once mode.
Parallelism of both the jobs is one.
Both the jobs are same in number of operators and type of operators.

When we start one job then that job runs fine. But as soon as we start the
second job then both the jobs start failing with "ProducerFenced" Exception
at runtime.

As per our understanding we think that both the jobs get the same value of
transactional ID (assigned by flink), that is required to run the job in
Exactly-Once mode.

I think, Flink calculates the transactional ID with concatenation of
operator names and subtask ID. For our case in both the jobs we have same
operators, and both jobs runs with parallelism of one, so both jobs get
subtask ID.

Question:
Kindly provide the solution for this exception, Kindly show some light on
our understanding the reason of this exception. And kindly tell how can we
get different transactional ID for two jobs.
Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


Need the way to create custom metrics

2018-12-17 Thread Gaurav Luthra
Hi,

I need to know the way to implement custom metrics in my flink program.
Currently, I know we can create custom metrics with the help of
RuntimeContext.
But in my aggregate() I do not have RuntimeContext. I am using window
operator and applying aggregate() method on it. And I am passing
AggregateFunction in aggregate() method.

So, Kindly guide me, how can I create custom metrics in my code?

Note: As we know we can not user RichAggregateFunction with aggregate()
method

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


Re: Partitioning by composite key, But type and number of keys are dynamic

2018-11-15 Thread Gaurav Luthra
Hi Chesnay,

My End user will be aware about the fields of "input records"
(GenericRecord). In configuration my end user only will tell me the name and
number of the fields, based on these fields of GenericRecord I will have to
partition the DataStream and make Keyed Stream.

Currently, I have implemented my solution using KeySelector() function. And
I have converted all the fields mentioned by user into strings, concatenated
these strings (for all the fields for which user want keyBy() means user
wants partitioning) and returned single string from KeySelector() function. 
And partitioning will be happening based on this concatenated string.
See the example below.

dataStream.keyBy(record ->
{
return
record.get("area").toString()+record.get("age").toString();
});


But, I am looking for better solution. As I do not want to convert every
field to string.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Partitioning by composite key, But type and number of keys are dynamic

2018-11-14 Thread Gaurav Luthra
There is a data stream of some records, Lets call them "input records".
Now, I want to partition this data stream by using keyBy(). I want
partitioning based on one or more fields of "input record", But the number
and type of fields are not fixed.
So, Kindly tell me how should I achieve this partitioning based on "input
records" mentioned above?

Note: Technically, I am using Avro's GenericRecord as "input records". Means
I am using DataStream, which needs to be partitioned. And its
schema can be different for different jobs. So, I do not know the field
names and types to be provided in keyBy().



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Regarding implementation of aggregate function using a ProcessFunction

2018-10-02 Thread Gaurav Luthra
Hi Fabian,

Thanks for explaining in detail. But we know and you also mentioned the
issues in 1) and 2). So, I am continuing with point 3).

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Mon, Oct 1, 2018 at 3:11 PM Fabian Hueske  wrote:

> Hi,
>
> There are basically three options:
> 1) Use an AggregateFunction and store everything that you would put into
> state into the Accumulator. This can become quite expensive because the
> Accumulator is de/serialized for every function call if you use RocksDB.
> The advantage is that you don't have to store all records in state but only
> the data you need. Simple aggregations like COUNT or SUM are quite cheap.
> 2) Use Flink's window primitiives and a WindowProcessFunction. In this
> case, all records of a Window are stored in a ListState. Adding a record to
> the LIst is cheap, but the state might grow quite large for longer windows.
> When the window is evaluated, all records are loaded into memory and
> iterated by the WindowProcessFunction.
> 3) Implement the windowing logic in a ProcessFunction. This requires a lot
> of additional logic, depending on what types of windows you want to support.
>
> Flink's SQL / Table API implements the first approach.
>
> Best, Fabian
>
> Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra <
> gauravluthra6...@gmail.com>:
>
>> Hi ken,
>>
>> Mine is very generic use case. Means I am building an aggregation
>> function using flink, which can be configured according to any use case.
>> Actually, It will not be for a specific use case and every user can enter
>> their business logic and use this aggregator to get result.
>> And about windowing also, user can configure the type of window and my
>> aggregator will ask about the required properties for that window.
>>
>> I hope you got some idea.
>>
>> But for make it generic I need to use processfunction and process()
>> method to implement it. Instead of more specific AggregateFunction and
>> aggregate() method.
>>
>> So, I am looking for inputs if anyone has tried implementing aggregation
>> using ProcessFunction and process() function. As it very much needed thing
>> with flink.
>>
>> Thanks and Regards,
>> Gaurav Luthra
>> Mob:- +91-9901945206
>>
>>
>> On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler 
>> wrote:
>>
>>> Hi Gaurav,
>>>
>>> I’m curious - for your use case, what are the windowing & aggregation
>>> requirements?
>>>
>>> E.g. is it a 10 second sliding window?
>>>
>>> And what’s the aggregation you’re trying to do?
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>>
>>> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra 
>>> wrote:
>>>
>>> Hi Chesnay,
>>>
>>> I know it is an issue, And won't be fixed because of window merging
>>> feature in case of session window.
>>> But I am looking if someone has implemented aggregation function using
>>> ProcessFunction and process() method instead of AggregationFunction and
>>> aggregate() method.
>>> I hope you got my point.
>>>
>>> Thanks & Regards
>>> Gaurav Luthra
>>>
>>>
>>>
>>> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler 
>>> wrote:
>>>
>>>> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>>>>
>>>> On 28.09.2018 11:27, vino yang wrote:
>>>>
>>>> Hi Gaurav,
>>>>
>>>> Yes, you are right. It is really not allowed to use RichFunction. I
>>>> will Ping Timo, he may give you a more professional answer.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Gaurav Luthra  于2018年9月28日周五 下午4:27写道:
>>>>
>>>>> Hi Vino,
>>>>>
>>>>> Kindly check below flink code.
>>>>>
>>>>> package org.apache.flink.streaming.api.datastream.WindowedStream
>>>>>
>>>>> @PublicEvolving
>>>>> public  SingleOutputStreamOperator
>>>>> aggregate(AggregateFunction function) {
>>>>> checkNotNull(function, "function");
>>>>>
>>>>> if (*function instanceof RichFunction*) {
>>>>> throw new *UnsupportedOperationException("This aggregation function
>>>>> cannot be a RichFunction.")*;
>>>>> }
>>>>>
>>>>> TypeInformation accumulatorType =
>>>>> TypeExtractor.getAggregateFunctionAccumulatorType(
>

Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-30 Thread Gaurav Luthra
Hi ken,

Mine is very generic use case. Means I am building an aggregation function
using flink, which can be configured according to any use case.
Actually, It will not be for a specific use case and every user can enter
their business logic and use this aggregator to get result.
And about windowing also, user can configure the type of window and my
aggregator will ask about the required properties for that window.

I hope you got some idea.

But for make it generic I need to use processfunction and process() method
to implement it. Instead of more specific AggregateFunction and aggregate()
method.

So, I am looking for inputs if anyone has tried implementing aggregation
using ProcessFunction and process() function. As it very much needed thing
with flink.

Thanks and Regards,
Gaurav Luthra
Mob:- +91-9901945206


On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler 
wrote:

> Hi Gaurav,
>
> I’m curious - for your use case, what are the windowing & aggregation
> requirements?
>
> E.g. is it a 10 second sliding window?
>
> And what’s the aggregation you’re trying to do?
>
> Thanks,
>
> — Ken
>
>
> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra 
> wrote:
>
> Hi Chesnay,
>
> I know it is an issue, And won't be fixed because of window merging
> feature in case of session window.
> But I am looking if someone has implemented aggregation function using
> ProcessFunction and process() method instead of AggregationFunction and
> aggregate() method.
> I hope you got my point.
>
> Thanks & Regards
> Gaurav Luthra
>
>
>
> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler 
> wrote:
>
>> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>>
>> On 28.09.2018 11:27, vino yang wrote:
>>
>> Hi Gaurav,
>>
>> Yes, you are right. It is really not allowed to use RichFunction. I will
>> Ping Timo, he may give you a more professional answer.
>>
>> Thanks, vino.
>>
>> Gaurav Luthra  于2018年9月28日周五 下午4:27写道:
>>
>>> Hi Vino,
>>>
>>> Kindly check below flink code.
>>>
>>> package org.apache.flink.streaming.api.datastream.WindowedStream
>>>
>>> @PublicEvolving
>>> public  SingleOutputStreamOperator
>>> aggregate(AggregateFunction function) {
>>> checkNotNull(function, "function");
>>>
>>> if (*function instanceof RichFunction*) {
>>> throw new *UnsupportedOperationException("This aggregation function
>>> cannot be a RichFunction.")*;
>>> }
>>>
>>> TypeInformation accumulatorType =
>>> TypeExtractor.getAggregateFunctionAccumulatorType(
>>> function, input.getType(), null, false);
>>>
>>> TypeInformation resultType =
>>> TypeExtractor.getAggregateFunctionReturnType(
>>> function, input.getType(), null, false);
>>>
>>> return aggregate(function, accumulatorType, resultType);
>>> }
>>>
>>>
>>> Kindly, check above snapshot of flink;s aggregate() method, that got
>>> applied on windowed stream.
>>>
>>> Thanks & Regards
>>> Gaurav Luthra
>>> Mob:- +91-9901945206
>>>
>>>
>>> On Fri, Sep 28, 2018 at 1:40 PM vino yang  wrote:
>>>
>>>> Hi Gaurav,
>>>>
>>>> This is very strange, can you share your code and specific exceptions?
>>>> Under normal circumstances, it should not throw an exception.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Gaurav Luthra  于2018年9月28日周五 下午3:27写道:
>>>>
>>>>> Hi Vino,
>>>>>
>>>>> RichAggregateFunction can surely access the state. But the problem is,
>>>>> In aggregate() method we can not use RichAggregateFunction.
>>>>> If we use then it throws exception.
>>>>>
>>>>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>>>>> method on windowed stream. Now, In AggregateFunction, we cannot access
>>>>> RuntimeContext. Hence we can not use state.
>>>>>
>>>>> Thanks & Regards
>>>>> Gaurav
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, 
>>>>> wrote:
>>>>>
>>>>>> Hi Gaurav,
>>>>>>
>>>>>> Why do you think the RichAggregateFunction cannot access the State
>>>>>> API?
>>>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides
>>>>>> a RuntimeContext that allows you to access the state API).
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> Gaurav Luthra  于2018年9月28日周五 下午1:38写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>>>>> aggregate() method upon windowed stream. So, To access the state in
>>>>>>> your
>>>>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>>>>> This issue is faced by many developers.
>>>>>>> So, someone must have implemented or tried to implement it. So,
>>>>>>> kindly share
>>>>>>> your feedback on this.
>>>>>>> As I need to implement this.
>>>>>>>
>>>>>>> Thanks & Regards
>>>>>>> Gaurav Luthra
>>>>>>>
>>>>>>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread Gaurav Luthra
Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature
in case of session window.
But I am looking if someone has implemented aggregation function using
ProcessFunction and process() method instead of AggregationFunction and
aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler  wrote:

> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>
> On 28.09.2018 11:27, vino yang wrote:
>
> Hi Gaurav,
>
> Yes, you are right. It is really not allowed to use RichFunction. I will
> Ping Timo, he may give you a more professional answer.
>
> Thanks, vino.
>
> Gaurav Luthra  于2018年9月28日周五 下午4:27写道:
>
>> Hi Vino,
>>
>> Kindly check below flink code.
>>
>> package org.apache.flink.streaming.api.datastream.WindowedStream
>>
>> @PublicEvolving
>> public  SingleOutputStreamOperator
>> aggregate(AggregateFunction function) {
>> checkNotNull(function, "function");
>>
>> if (*function instanceof RichFunction*) {
>> throw new *UnsupportedOperationException("This aggregation function
>> cannot be a RichFunction.")*;
>> }
>>
>> TypeInformation accumulatorType =
>> TypeExtractor.getAggregateFunctionAccumulatorType(
>> function, input.getType(), null, false);
>>
>> TypeInformation resultType =
>> TypeExtractor.getAggregateFunctionReturnType(
>> function, input.getType(), null, false);
>>
>> return aggregate(function, accumulatorType, resultType);
>> }
>>
>>
>> Kindly, check above snapshot of flink;s aggregate() method, that got
>> applied on windowed stream.
>>
>> Thanks & Regards
>> Gaurav Luthra
>> Mob:- +91-9901945206
>>
>>
>> On Fri, Sep 28, 2018 at 1:40 PM vino yang  wrote:
>>
>>> Hi Gaurav,
>>>
>>> This is very strange, can you share your code and specific exceptions?
>>> Under normal circumstances, it should not throw an exception.
>>>
>>> Thanks, vino.
>>>
>>> Gaurav Luthra  于2018年9月28日周五 下午3:27写道:
>>>
>>>> Hi Vino,
>>>>
>>>> RichAggregateFunction can surely access the state. But the problem is,
>>>> In aggregate() method we can not use RichAggregateFunction.
>>>> If we use then it throws exception.
>>>>
>>>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>>>> method on windowed stream. Now, In AggregateFunction, we cannot access
>>>> RuntimeContext. Hence we can not use state.
>>>>
>>>> Thanks & Regards
>>>> Gaurav
>>>>
>>>>
>>>>
>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, 
>>>> wrote:
>>>>
>>>>> Hi Gaurav,
>>>>>
>>>>> Why do you think the RichAggregateFunction cannot access the State
>>>>> API?
>>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides
>>>>> a RuntimeContext that allows you to access the state API).
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> Gaurav Luthra  于2018年9月28日周五 下午1:38写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>>>> aggregate() method upon windowed stream. So, To access the state in
>>>>>> your
>>>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>>>> This issue is faced by many developers.
>>>>>> So, someone must have implemented or tried to implement it. So,
>>>>>> kindly share
>>>>>> your feedback on this.
>>>>>> As I need to implement this.
>>>>>>
>>>>>> Thanks & Regards
>>>>>> Gaurav Luthra
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>


Re: Regarding implementation of aggregate function using a ProcessFunction

2018-09-28 Thread Gaurav Luthra
Hi Vino,

Kindly check below flink code.

package org.apache.flink.streaming.api.datastream.WindowedStream

@PublicEvolving
public  SingleOutputStreamOperator
aggregate(AggregateFunction function) {
checkNotNull(function, "function");

if (*function instanceof RichFunction*) {
throw new *UnsupportedOperationException("This aggregation function cannot
be a RichFunction.")*;
}

TypeInformation accumulatorType =
TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);

TypeInformation resultType =
TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);

return aggregate(function, accumulatorType, resultType);
}


Kindly, check above snapshot of flink;s aggregate() method, that got
applied on windowed stream.

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Fri, Sep 28, 2018 at 1:40 PM vino yang  wrote:

> Hi Gaurav,
>
> This is very strange, can you share your code and specific exceptions?
> Under normal circumstances, it should not throw an exception.
>
> Thanks, vino.
>
> Gaurav Luthra  于2018年9月28日周五 下午3:27写道:
>
>> Hi Vino,
>>
>> RichAggregateFunction can surely access the state. But the problem is, In
>> aggregate() method we can not use RichAggregateFunction.
>> If we use then it throws exception.
>>
>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>> method on windowed stream. Now, In AggregateFunction, we cannot access
>> RuntimeContext. Hence we can not use state.
>>
>> Thanks & Regards
>> Gaurav
>>
>>
>>
>> On Fri, 28 Sep, 2018, 12:40 PM vino yang,  wrote:
>>
>>> Hi Gaurav,
>>>
>>> Why do you think the RichAggregateFunction cannot access the State API?
>>> RichAggregateFunction inherits from AbstractRichFunction (it provides a
>>> RuntimeContext that allows you to access the state API).
>>>
>>> Thanks, vino.
>>>
>>> Gaurav Luthra  于2018年9月28日周五 下午1:38写道:
>>>
>>>> Hi,
>>>>
>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>> aggregate() method upon windowed stream. So, To access the state in your
>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>> This issue is faced by many developers.
>>>> So, someone must have implemented or tried to implement it. So, kindly
>>>> share
>>>> your feedback on this.
>>>> As I need to implement this.
>>>>
>>>> Thanks & Regards
>>>> Gaurav Luthra
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>


Regarding implementation of aggregate function using a ProcessFunction

2018-09-27 Thread Gaurav Luthra
Hi,

As we are aware, Currently we cannot use RichAggregateFunction in
aggregate() method upon windowed stream. So, To access the state in your
customAggregateFunction, you can implement it using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to implement it. So, kindly share
your feedback on this.
As I need to implement this.

Thanks & Regards
Gaurav Luthra



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/