Yingjie Cao created FLINK-19582:
-----------------------------------

             Summary: Introduce sort-merge based blocking shuffle to Flink
                 Key: FLINK-19582
                 URL: https://issues.apache.org/jira/browse/FLINK-19582
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Network
    Affects Versions: 1.12.0
            Reporter: Yingjie Cao
             Fix For: 1.12.0


 

*Motivation*

Hash-based blocking shuffle and sort-merge based blocking shuffle are two main 
blocking shuffle implementations wildly adopted by existing distributed data 
processing frameworks. Hash-based implementation writes data sent to different 
reducer tasks into separate files concurrently while sort-merge based approach 
writes those data together into a single file and merges those small files into 
bigger ones. Compared to sort-merge based approach, hash-based approach has 
several weak points when it comes to running large scale batch jobs:

*1. Stability*

For high parallelism (tens of thousands) batch job, current hash-based blocking 
shuffle implementation writes too many files concurrently which gives high 
pressure to the file system, for example, maintenance of too many  file metas, 
high system cpu consumption and exhaustion of inodes or file descriptors. All 
of these can be potential stability issues which we encountered in our 
production environment before we switch to sort-merge based blocking shuffle.

Sort-Merge based blocking shuffle don’t have the problem because for one result 
partition, only one file is written at the same time.

*2. Performance*

Large amounts of small shuffle files and random io can influence shuffle 
performance a lot especially for hdd (for ssd, sequential read is also 
important because of read ahead and cache). 

For batch job processing massive data, small amount of data per subpartition is 
common, because to reduce the job completion time, we usually increase the job 
parallelism to reduce the amount of data processed per task and the average 
data amount per subpartition is relevant to:

(the amount of data per task) / (parallelism) = (total amount of data) / 
(parallelism^2)

which means increasing parallelism can decrease the amount of data per 
subpartition rapidly. 

Besides, data skew is another cause of small subpartition files. By merging 
data of all subpartitions together in one file, more sequential read can be 
achieved.

*3. Resource*

For current hash-based implementation, each subpartition needs at least one 
buffer. For large scale batch shuffles, the memory consumption can be huge. For 
example, we need at least 320M network memory per result partition if 
parallelism is set to 10000 and because of the huge network consumption, it is 
hard to config the network memory for large scale batch job and  sometimes 
parallelism can not be increased just because of insufficient network memory  
which leads to bad user experience.

By introducing the sort-merge based approach to Flink, we can improve Flink’s 
capability of running large scale batch jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to