otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r660751432
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -386,40 +415,53 @@ final class ShuffleBlockFetcherIterator(
}
val (remoteBlockBytes, numRemoteBlocks) =
collectedRemoteRequests.foldLeft((0L, 0))((x, y) => (x._1 + y.size, x._2
+ y.blocks.size))
- val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
- assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size +
numRemoteBlocks,
- s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the
number of local " +
- s"blocks ${localBlocks.size} + the number of host-local blocks
${hostLocalBlocks.size} " +
- s"+ the number of remote blocks ${numRemoteBlocks}.")
- logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)})
non-empty blocks " +
- s"including ${localBlocks.size}
(${Utils.bytesToString(localBlockBytes)}) local and " +
- s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)})
" +
- s"host-local and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
+ val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes +
+ pushMergedLocalBlockBytes
+ val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
+ assert(blocksToFetchCurrentIteration == localBlocks.size +
+ hostLocalBlocksCurrentIteration.size + numRemoteBlocks +
pushMergedLocalBlocks.size,
+ s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't
equal to " +
+ s"the number of local blocks ${localBlocks.size} + " +
+ s"the number of host-local blocks
${hostLocalBlocksCurrentIteration.size} " +
+ s"the number of push-merged-local blocks ${pushMergedLocalBlocks.size}
" +
+ s"+ the number of remote blocks ${numRemoteBlocks} ")
+ logInfo(s"Getting $blocksToFetchCurrentIteration " +
+ s"(${Utils.bytesToString(totalBytes)}) non-empty blocks including " +
+ s"${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local
and " +
+ s"${hostLocalBlocksCurrentIteration.size}
(${Utils.bytesToString(hostLocalBlockBytes)}) " +
+ s"host-local and ${pushMergedLocalBlocks.size} " +
+ s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " +
+ s"local push-merged and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) " +
+ s"remote blocks")
+ this.hostLocalBlocks ++= hostLocalBlocksCurrentIteration
Review comment:
I have made this change but also added a var for counting num of
hostLocalBlocks which is needed for the assertions. PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]