rmcyang commented on pull request #34461:
URL: https://github.com/apache/spark/pull/34461#issuecomment-963475559


   > I'm thinking that this condition may be wrong:
   > 
   > 
https://github.com/apache/spark/blob/31b6f614d3173c8a5852243bf7d0b6200788432d/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L134-L136
   > 
   > For a retried map stage, we don't push&merge shuffle blocks for tasks due 
to `dep.shuffleMergeEnabled=false`. However, `dep.shuffleMergeEnabled=false` 
should be the reason for the reduce stage to not fetch from the existing merged 
shuffle data in the previous attempt of the map stage. Right?
   
   IIUC, for a retried map stage, it would go with below code path due to 
`dep.shuffleMergeEnabled=false`:
   
https://github.com/apache/spark/blob/31b6f614d3173c8a5852243bf7d0b6200788432d/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L138-L140
   Prior to this PR, `getMapSizesByExecutorId` would call 
`getPushBasedShuffleMapSizesByExecutorId`, which would fetch 
mergeOutputStatuses in `getStatuses` even for a retried map stage where 
push-based shuffle is disabled. This further causes `enableBatchFetch=false` in 
the `convertMapStatuses`, thus results in the assertion failure.
   
   The proposed change tries to avoid this behavior - when 
`dep.shuffleMergeEnabled=false`, `getMapSizesByExecutorId` would pass 
`canFetchMergeResult=false` to `getStatuses` to not fetch mergeOutputStatuses 
for this case. @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to