Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/19788#discussion_r158123441
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,10 @@ private[spark] object MapOutputTracker extends
Logging {
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition,
errorMessage)
} else {
- for (part <- startPartition until endPartition) {
- splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer())
+=
- ((ShuffleBlockId(shuffleId, mapId, part),
status.getSizeForBlock(part)))
- }
+ val totalSize: Long = (startPartition until
endPartition).map(status.getSizeForBlock).sum
+ splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
+ ((ShuffleBlockId(shuffleId, mapId, startPartition, endPartition
- startPartition),
+ totalSize))
--- End diff --
This is going to create some very heavy shuffle fetches - and looks
incorrect.
This merge should not be happening here, but in
`ShuffleBlockFetcherIterator`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]