xuanyuanking opened a new pull request #26040: [SPARK-9853][Core] Optimize 
shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040
 
 
   This PR takes over #19788. After we split the shuffle fetch protocol from 
`OpenBlock` in #24565, this optimization can be extended in the new shuffle 
protocol. Credit to @yucai, closes #19788.
   
   ### What changes were proposed in this pull request?
   This PR adds the support for continuous shuffle block fetching in batch:
   
   - Shuffle client changes:
       - Add new feature tag `spark.shuffle.fetchContinuousBlocksInBatch`, 
implement the decision logic in `BlockStoreShuffleReader`.
       - Merge the continuous shuffle block ids in batch if needed in 
ShuffleBlockFetcherIterator.
   - Shuffle server changes:
       - Add support in `ExternalBlockHandler` for the external shuffle service 
side.
       - Make `ShuffleBlockResolver.getBlockData` accept getting block data by 
range.
   - Protocol changes:
       - Add new block id type `ShuffleBlockBatchId` represent continuous 
shuffle block ids.
       - Extend `FetchShuffleBlocks` and `OneForOneBlockFetcher`.
       - After the new shuffle fetch protocol completed in #24565, the backward 
compatibility for external shuffle service can be controlled by 
`spark.shuffle.useOldFetchProtocol`.
   
   ### Why are the changes needed?
   In adaptive execution, one reducer may fetch multiple continuous shuffle 
blocks from one map output file. However, as the original approach, each 
reducer needs to fetch those 10 reducer blocks one by one. This way needs many 
IO and impacts performance. This PR is to support fetching those continuous 
shuffle blocks in one IO (batch way). See below example:
   
   The shuffle block is stored like below:
   The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to 
BlockId.scala.
   
![image](https://user-images.githubusercontent.com/2989575/51654634-c37fbd80-1fd3-11e9-935e-5652863676c3.png)
   
   In adaptive execution, one reducer may want to read output for reducer 5 to 
14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
   Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output 
file.
   After this PR, Spark only needs 1 disk IO and 1 network IO. This way can 
reduce IO dramatically.
   
   ### Does this PR introduce any user-facing change?
   No.
   
   ### How was this patch tested?
   Add new UT.
   Integrate test with setting `spark.sql.adaptive.enabled=true`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to