Here is some sample data which may help visualize how the aggregation is
changed dynamically.
We start by aggregating by session and session+account by placing values into
aggregationKey based on the fields in groupByFIelds.
Then we delete the session+account aggregation, and add an aggregation by
account.
The way we are changing the aggregation dynamically is by using an indirect
field to key by called aggregationKey which we add based on current broadcast
state.
Note, this is for streaming jobs and aggregations starts fresh from the point
at which a new groupByType is received.
aggregateInstruction
{groupByFields:[session],groupByType:bySession,action:add}
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:add}
dataToAggregate
{session:1,account:1,value:100}
{session:2,account:1,value:200}
{session:1,account:2,value:400}
{session:1,account:1,value:800}
streamReadyToAggregate
{session:1,account:1,value:100,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:1,value:100,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}
{session:2,account:1,value:200,groupByFields:[session],groupByType:bySession,aggregationKey:'2'}}
{session:2,account:1,value:200,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'2|1'}}
{session:1,account:2,value:400,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:2,value:400,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|2'}}
{session:1,account:1,value:800,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}}
{session:1,account:1,value:800,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}}}
aggregateInstruction
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:delete}
{groupByFields:[account],groupByType:byAccount,action:add}
dataToAggregate
{session:3,account:1,value:1600}
{session:3,account:2,value:3200}
streamReadyToAggregate
{session:3,account:1,value:1600,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:1600,groupByFields:[account],groupByType:byAccount,aggregationKey:'1'}
{session:3,account:1,value:3200,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:3200,groupByFields:[account],groupByType:byAccount,aggregationKey:'2'}
From: Colletta, Edward <[email protected]>
Sent: Tuesday, January 25, 2022 1:29 PM
To: M Singh <[email protected]>; Caizhi Weng <[email protected]>;
User-Flink <[email protected]>
Subject: RE: Apache Fink - Adding/Removing KeyedStreams at run time without
stopping the application
You don’t have to add keyBy’s at runtime. You change what is in the value of
aggregationKey at run time
Some records may appear several times with different fields extracted to
aggregationKey. They dynamic building of the grouping is really done by the
flatMap
From: M Singh <[email protected]<mailto:[email protected]>>
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng <[email protected]<mailto:[email protected]>>; User-Flink
<[email protected]<mailto:[email protected]>>; Colletta, Edward
<[email protected]<mailto:[email protected]>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without
stopping the application
NOTICE: This email is from an external sender - do not click on links or
attachments unless you recognize the sender and know the content is safe.
Thanks Edward for your response.
The problem I have is that I am not sure how to add or remove keyBy's at run
time since the flink topology is based on that (as Caizhi mentioned).
I believe we can change the single keyBy in your example, but not add/remove
them.
Please let me know if I have missed anything.
On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward
<[email protected]<mailto:[email protected]>> wrote:
A general pattern for dynamically adding new aggregations could be something
like this
BroadcastStream<AggregationInstructions> broadcastStream =
aggregationInstructions
.broadcast(broadcastStateDescriptor);
DataStream<DataToAggregateEnrichedWithAggregationInstructions>
streamReadyToAggregate = dataToAggregate
.connect(broadcastStream)
.process(new JoinFunction())
.flatMap(new AddAggregationKeyAndDescriptor)
.keyBy('aggregationKey')
Where
· aggregationInstructions is a stream describing which fields to
aggregate by. It might contain a List<String> of the field names and another
field which can be used to describe what the aggregation is doing. Example
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType =
‘bySession’, action = ‘Add’ or ‘Delete’
· JoinFunction is a KeyedBroadcastProcessFunction which adds the
groupByFields and groupingType to each message in the dataToAggregate stream
and possibly deletes groupings from state.
· AddAggregationKeyAndDescriptor is a FlatMapFunction which adds
aggregationKey to the stream based on the value of groupByFields
The flatMap means one message may be emitted several times with different
values of aggregationKey so it may belong to multiple aggregations.
From: M Singh <[email protected]<mailto:[email protected]>>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <[email protected]<mailto:[email protected]>>; User-Flink
<[email protected]<mailto:[email protected]>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without
stopping the application
NOTICE: This email is from an external sender - do not click on links or
attachments unless you recognize the sender and know the content is safe.
Hi Caizhi:
Thanks for your reply.
I need to aggregate streams based on dynamic groupings. All the groupings
(keyBy) are not known upfront and can be added or removed after the streaming
application is started and I don't want to restart the application/change the
code. So, I wanted to find out, what are the options to achieve this
functionality. Please let me know if you have any advice or recommendations.
Thanks
On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng
<[email protected]<mailto:[email protected]>> wrote:
Hi!
Adding/removing keyed streams will change the topology graph of the job.
Currently it is not possible to do so without restarting the job and as far as
I know there is no existing framework/pattern to achieve this.
By the way, why do you need this functionality? Could you elaborate more on
your use case?
M Singh <[email protected]<mailto:[email protected]>> 于2022年1月22日周六
21:32写道:
Hi Folks:
I am working on an exploratory project in which I would like to add/remove
KeyedStreams
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
without restarting the Flink streaming application.
Is it possible natively in Apache Flink ? If not, is there any
framework/pattern which can be used to implement this without restarting the
application/changing the code ?
Thanks