otterc commented on a change in pull request #30691: URL: https://github.com/apache/spark/pull/30691#discussion_r643429058
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -1743,6 +1762,10 @@ private[spark] class DAGScheduler( } else if (mapIndex != -1) { // Mark the map whose fetch failed as broken in the map stage mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress) + if (mapStage.shuffleDep.shuffleMergeEnabled) { + mapOutputTracker. + unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) Review comment: We need to pass `mapId`. The intention here is to unregister the mergeStatus if the block corresponding to the <shuffleId, mapId, reduceId> got merged for the reduce partition <shuffleId, reduceId>. mergeStatus tracks `mapIds` and not mapIndices. Also we can add a comment here that why are we doing this: When there is fetch failure for a block <shuffleId, mapId, reduceId> and if that block was merged to partition <shuffleId, reduceId>, it indicates that the iterator also failed to fetch the merged block for <shuffleId, reduceId>. So, we unregister the mergeResult for partition<shuffleId, reduceId> as there is no guarantee that the merged block for <shuffleId, reduceId> from the same blockManager will be successful the next time this stage is resubmitted. On a side note, this was one of the reasons I thought this change should go with the fetch side change because it makes the explanation easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org