Hi, Dominik.
For data skew, I think you can refer to the tuning and optimization ideas in 
Flink SQL [1] and implement it manually through the DataStream API. If it is 
simple processing logic and aggregation operations, you can even use the Flink 
SQL API directly. Especially the way you manually add polling numbers now is 
actually the split distinct automatic optimization process [2].


> I see that some taskmanagers handle substantially more load than others (few 
> million records difference)


IIUC, this could be because the combined operator you used subsequently always 
shuffles these data to the same task manager. You can examine the processing 
throughput for each vertex on Flink UI by observing the 'records received' 
metric, to check if there are any other nodes causing data skew except this 
aggregation node.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#performance-tuning
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation

--

    Best!
    Xuyang




At 2024-02-22 15:44:26, "" <dominik.buen...@swisscom.com> wrote:

Hi all,

 

I am currently facing the problem of having a pipeline (DataStream API) where I 
need to split a GenericRecord into its fields and then aggregate all the values 
of a particular field into 30 minute windows. Therefore, if I were to use only 
a keyBy field name, I would send all the values of a field to the same parallel 
instance of the cluster. This is bad as the stream is quite large (15k 
events/second). What I want to achieve is a more even distribution of events 
across the different taskmanagers.

 

Currently, I assign a "rolling" number (between 0 and maximum parallelism) to 
the field name as a secondary key component and use this combination as keyBy. 
This leads to "partitioned" events, which I have to recombine in a second step 
by using only the field name of the composite key.

 

 

I tested this approach and it works but when looking at the Flink WebUI, I see 
that some taskmanagers handle substantially more load than others (few million 
records difference). I also had a look at the partitionCustom() but this 
doesn’t work for KeyedStreams right? Did someone else face a related issue? Any 
suggestions how I can distribute events with the same key more evenly?

 

Kind Regards

Dominik

 

 

Reply via email to