mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719669365
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: improve push based shuffle to read partial merged blocks
satisfying the start/end
// TODO: map indexes
if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
- && endMapIndex == mapStatuses.length) {
+ && endMapIndex == mapStatuses.length && endPartition - startPartition ==
1) {
Review comment:
To elaborate:
* If push based shuffle is enabled for a stage we want to disable block
fetch.
* The check in `BlockStoreShuffleReader` does this.
* If block fetch is enabled, we want to make sure it always goes through the
`else` condition here.
* block fetch assumes ordering of id's which get violated in the `if`
block here.
For background, `mergeStatuses != null` and `mergeStatuses.length ==
numReducers` when `Utils.isPushBasedShuffleEnabled == true` and `mergeStatuses
= Array.empty` (or `null` depending on codepath) when it is disabled.
For the case of `Utils.isPushBasedShuffleEnabled == true` and
`BlockStoreShuffleReader.shouldBatchFetch == true`, we have following cases:
* `shuffleStage.shuffleMergeEnabled == false`
* `mergeStatuses` has only nulls.
* With current PR, `doBatchFetch` == true, but we take the `if` condition
in `convertMapStatuses` - which results in incorrect ordering of blocks.
* This is the root cause of the hang.
* With the proposed change above, we will take else block.
* Push based shuffle is not relevant for task anyway, and now block
fetch will work fine.
* Additional change - we always take `else` path even when
`Utils.isPushBasedShuffleEnabled == true`.
* This should be functionally equivalent.
* Now we can potentially benefit from block fetch (when push based
shuffle does not kick in - stage retry for example) - so better behavior; but
more importantly, should not cause issues.
* `shuffleStage.shuffleMergeEnabled == true`
* With this PR, block fetch disabled.
* With push based shuffle having merged blocks, it takes the `if` block -
else it takes `else` block (for direct fetch of unmerged blocks).
* No behavior change : other than disabling block fetch.
* This prevents duplicate data fetch issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]