Re: What is the best way to aggregate data over a long window

2024-05-20 Thread gongzhongqiang
Hi  Sachin,

`performing incremental aggregation using stateful processing` is same as
`windows with agg`, but former is more flexible.If flink window can not
satisfy your performance needs
,and your business logic has some features that can be customized for
optimization. You can choose the former.

Best,
Zhongqiang Gong

Sachin Mittal  于2024年5月17日周五 19:39写道:

> Hi,
> I am doing the following
> 1. Use reduce function where the data type of output after windowing is
> the same as the input.
> 2. Where the output of data type after windowing is different from that of
> input I use the aggregate function. For example:
>
> SingleOutputStreamOperator data =
> reducedPlayerStatsData
> .keyBy(new KeySelector())
> .window(
> TumblingEventTimeWindows.of(Time.seconds(secs)))
> .aggregate(new DataAggregator())
> .name("aggregate");
>
> In this case data which is aggregated is of a different type than the
> input so I had to use aggregate function.
> However in cases where data is of the same type using reduce function is
> very simple to use.
> Is there any fundamental difference between aggregate and reduce function
> in terms of performance?
> 3. I have enable incremental checkpoints at flink conf level using:
> state.backend.type: "rocksdb"
> state.backend.incremental: "true"
>
> 4. I am really not sure how I can use TTL. I assumed that Flink would
> automatically clean the state of windows that are expired ? Is there any
> way I can use TTL in the steps I have mentioned.
> 5. When you talk about pre-aggregation is this what you mean, say first
> compute minute aggregation and use that as input for hour aggregation ? So
> my pipeline would be something like this:
>
> SingleOutputStreamOperator reducedData =
> data
> .keyBy(new KeySelector())
> .window(
> TumblingEventTimeWindows.of(Time.seconds(60)))
> .reduce(new DataReducer()).window(
>
> TumblingEventTimeWindows.of(Time.seconds(3600)))
> .reduce(new DataReducer()).name("reduce");
>
>
> I was thinking of performing incremental aggregation using stateful 
> processing.
>
> Basically read one record and reduce it and store it in state and then read 
> next and reduce that plus the current state and update the new reduced value 
> back in the state and so on.
>
> Fire the final reduced value from the state at the end of eventtime I 
> register to my event timer and then update the timer to next event time and 
> also clean the state.
>
> This way each state would always keep only one record, no matter for what 
> period we aggregate data for.
>
> Is this a better approach than windowing ?
>
>
> Thanks
> Sachin
>
>
> On Fri, May 17, 2024 at 1:14 PM gongzhongqiang 
> wrote:
>
>> Hi  Sachin,
>>
>> We can optimize this problem in the following ways:
>> -
>> use 
>> org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
>> to reduce number of data
>> - use TTL to clean data which are not need
>> - enble incremental checkpoint
>> - use
>> multi-level time window granularity for pre-aggregation can significantly 
>> improve performance and reduce computation latency
>>
>> Best,
>> Zhongqiang Gong
>>
>> Sachin Mittal  于2024年5月17日周五 03:48写道:
>>
>>> Hi,
>>> My pipeline step is something like this:
>>>
>>> SingleOutputStreamOperator reducedData =
>>> data
>>> .keyBy(new KeySelector())
>>> .window(
>>> TumblingEventTimeWindows.of(Time.seconds(secs)))
>>> .reduce(new DataReducer())
>>> .name("reduce");
>>>
>>>
>>> This works fine for secs = 300.
>>> However once I increase the time window to say 1 hour or 3600 the state
>>> size increases as now it has a lot more records to reduce.
>>>
>>> Hence I need to allocate much more memory to the task manager.
>>>
>>> However there is no upper limit to this memory allocated. If the volume
>>> of data increases by say 10 fold I would have no option but to again
>>> increase the memory.
>>>
>>> Is there a better way to perform long window aggregation so overall this
>>> step has a small memory footprint.
>>>
>>> Thanks
>>> Sachin
>>>
>>>


Re: What is the best way to aggregate data over a long window

2024-05-17 Thread Sachin Mittal
Hi,
I am doing the following
1. Use reduce function where the data type of output after windowing is the
same as the input.
2. Where the output of data type after windowing is different from that of
input I use the aggregate function. For example:

SingleOutputStreamOperator data =
reducedPlayerStatsData
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.aggregate(new DataAggregator())
.name("aggregate");

In this case data which is aggregated is of a different type than the input
so I had to use aggregate function.
However in cases where data is of the same type using reduce function is
very simple to use.
Is there any fundamental difference between aggregate and reduce function
in terms of performance?
3. I have enable incremental checkpoints at flink conf level using:
state.backend.type: "rocksdb"
state.backend.incremental: "true"

4. I am really not sure how I can use TTL. I assumed that Flink would
automatically clean the state of windows that are expired ? Is there any
way I can use TTL in the steps I have mentioned.
5. When you talk about pre-aggregation is this what you mean, say first
compute minute aggregation and use that as input for hour aggregation ? So
my pipeline would be something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new DataReducer()).window(

TumblingEventTimeWindows.of(Time.seconds(3600)))
.reduce(new DataReducer()).name("reduce");


I was thinking of performing incremental aggregation using stateful processing.

Basically read one record and reduce it and store it in state and then
read next and reduce that plus the current state and update the new
reduced value back in the state and so on.

Fire the final reduced value from the state at the end of eventtime I
register to my event timer and then update the timer to next event
time and also clean the state.

This way each state would always keep only one record, no matter for
what period we aggregate data for.

Is this a better approach than windowing ?


Thanks
Sachin


On Fri, May 17, 2024 at 1:14 PM gongzhongqiang 
wrote:

> Hi  Sachin,
>
> We can optimize this problem in the following ways:
> -
> use 
> org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
> to reduce number of data
> - use TTL to clean data which are not need
> - enble incremental checkpoint
> - use
> multi-level time window granularity for pre-aggregation can significantly 
> improve performance and reduce computation latency
>
> Best,
> Zhongqiang Gong
>
> Sachin Mittal  于2024年5月17日周五 03:48写道:
>
>> Hi,
>> My pipeline step is something like this:
>>
>> SingleOutputStreamOperator reducedData =
>> data
>> .keyBy(new KeySelector())
>> .window(
>> TumblingEventTimeWindows.of(Time.seconds(secs)))
>> .reduce(new DataReducer())
>> .name("reduce");
>>
>>
>> This works fine for secs = 300.
>> However once I increase the time window to say 1 hour or 3600 the state
>> size increases as now it has a lot more records to reduce.
>>
>> Hence I need to allocate much more memory to the task manager.
>>
>> However there is no upper limit to this memory allocated. If the volume
>> of data increases by say 10 fold I would have no option but to again
>> increase the memory.
>>
>> Is there a better way to perform long window aggregation so overall this
>> step has a small memory footprint.
>>
>> Thanks
>> Sachin
>>
>>


Re: What is the best way to aggregate data over a long window

2024-05-17 Thread gongzhongqiang
Hi  Sachin,

We can optimize this problem in the following ways:
-
use 
org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
to reduce number of data
- use TTL to clean data which are not need
- enble incremental checkpoint
- use
multi-level time window granularity for pre-aggregation can
significantly improve performance and reduce computation latency

Best,
Zhongqiang Gong

Sachin Mittal  于2024年5月17日周五 03:48写道:

> Hi,
> My pipeline step is something like this:
>
> SingleOutputStreamOperator reducedData =
> data
> .keyBy(new KeySelector())
> .window(
> TumblingEventTimeWindows.of(Time.seconds(secs)))
> .reduce(new DataReducer())
> .name("reduce");
>
>
> This works fine for secs = 300.
> However once I increase the time window to say 1 hour or 3600 the state
> size increases as now it has a lot more records to reduce.
>
> Hence I need to allocate much more memory to the task manager.
>
> However there is no upper limit to this memory allocated. If the volume of
> data increases by say 10 fold I would have no option but to again increase
> the memory.
>
> Is there a better way to perform long window aggregation so overall this
> step has a small memory footprint.
>
> Thanks
> Sachin
>
>


What is the best way to aggregate data over a long window

2024-05-16 Thread Sachin Mittal
Hi,
My pipeline step is something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.reduce(new DataReducer())
.name("reduce");


This works fine for secs = 300.
However once I increase the time window to say 1 hour or 3600 the state
size increases as now it has a lot more records to reduce.

Hence I need to allocate much more memory to the task manager.

However there is no upper limit to this memory allocated. If the volume of
data increases by say 10 fold I would have no option but to again increase
the memory.

Is there a better way to perform long window aggregation so overall this
step has a small memory footprint.

Thanks
Sachin