rmcyang commented on pull request #34461: URL: https://github.com/apache/spark/pull/34461#issuecomment-963475559
> I'm thinking that this condition may be wrong: > > https://github.com/apache/spark/blob/31b6f614d3173c8a5852243bf7d0b6200788432d/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L134-L136 > > For a retried map stage, we don't push&merge shuffle blocks for tasks due to `dep.shuffleMergeEnabled=false`. However, `dep.shuffleMergeEnabled=false` should be the reason for the reduce stage to not fetch from the existing merged shuffle data in the previous attempt of the map stage. Right? IIUC, for a retried map stage, it would go with below code path due to `dep.shuffleMergeEnabled=false`: https://github.com/apache/spark/blob/31b6f614d3173c8a5852243bf7d0b6200788432d/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L138-L140 Prior to this PR, `getMapSizesByExecutorId` would call `getPushBasedShuffleMapSizesByExecutorId`, which would fetch mergeOutputStatuses in `getStatuses` even for a retried map stage where push-based shuffle is disabled. This further causes `enableBatchFetch=false` in the `convertMapStatuses`, thus results in the assertion failure. The proposed change tries to avoid this behavior - when `dep.shuffleMergeEnabled=false`, `getMapSizesByExecutorId` would pass `canFetchMergeResult=false` to `getStatuses` to not fetch mergeOutputStatuses for this case. @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
