mridulm commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r719669365
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1448,7 +1448,7 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: improve push based shuffle to read partial merged blocks
satisfying the start/end
// TODO: map indexes
if (mergeStatuses.exists(_.nonEmpty) && startMapIndex == 0
- && endMapIndex == mapStatuses.length) {
+ && endMapIndex == mapStatuses.length && endPartition - startPartition ==
1) {
Review comment:
To elaborate:
* If push based shuffle is enabled for a stage we want to disable block
fetch.
* The check in `BlockStoreShuffleReader` does this.
* If block fetch is enabled, we want to make sure it always goes through the
`else` condition here.
* block fetch assumes ordering of id's which get violated in the `if`
block here.
For the case of `Utils.isPushBasedShuffleEnabled == true` and
`BlockStoreShuffleReader.shouldBatchFetch == true`, we have following cases:
* `shuffleStage.shuffleMergeEnabled == false`
* `mergeStatuses != null` and `mergeStatuses.length == numReducers` though
`mergeStatuses` has only nulls.
* With current PR, `doBatchFetch` == true, but we take the `if` condition
in `convertMapStatuses` - which results in incorrect ordering of blocks.
* With the proposed change above, we will take else block.
* This is the root cause of the hang.
* Additional change - we always take `else` path even for push based
shuffle in this case.
* This should be functionally equivalent - since we can now rely on
block fetch when push based shuffle does not kick in (stage retry for example)
- so better behavior; but more importantly, should not cause issues.
* `shuffleStage.shuffleMergeEnabled == true`
* With this PR, block fetch disabled.
* With push based shuffle having merged blocks, it takes the `if` block -
else it takes `else` block (for direct fetch of unmerged blocks).
* No behavior change : other than disabling block fetch.
* This prevents duplicate data fetch issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]