otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r660721217
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -386,40 +415,53 @@ final class ShuffleBlockFetcherIterator(
}
val (remoteBlockBytes, numRemoteBlocks) =
collectedRemoteRequests.foldLeft((0L, 0))((x, y) => (x._1 + y.size, x._2
+ y.blocks.size))
- val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
- assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size +
numRemoteBlocks,
- s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the
number of local " +
- s"blocks ${localBlocks.size} + the number of host-local blocks
${hostLocalBlocks.size} " +
- s"+ the number of remote blocks ${numRemoteBlocks}.")
- logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)})
non-empty blocks " +
- s"including ${localBlocks.size}
(${Utils.bytesToString(localBlockBytes)}) local and " +
- s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)})
" +
- s"host-local and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
+ val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes +
+ pushMergedLocalBlockBytes
+ val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
+ assert(blocksToFetchCurrentIteration == localBlocks.size +
+ hostLocalBlocksCurrentIteration.size + numRemoteBlocks +
pushMergedLocalBlocks.size,
+ s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't
equal to " +
+ s"the number of local blocks ${localBlocks.size} + " +
+ s"the number of host-local blocks
${hostLocalBlocksCurrentIteration.size} " +
+ s"the number of push-merged-local blocks ${pushMergedLocalBlocks.size}
" +
+ s"+ the number of remote blocks ${numRemoteBlocks} ")
+ logInfo(s"Getting $blocksToFetchCurrentIteration " +
+ s"(${Utils.bytesToString(totalBytes)}) non-empty blocks including " +
+ s"${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local
and " +
+ s"${hostLocalBlocksCurrentIteration.size}
(${Utils.bytesToString(hostLocalBlockBytes)}) " +
+ s"host-local and ${pushMergedLocalBlocks.size} " +
+ s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " +
+ s"local push-merged and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) " +
+ s"remote blocks")
+ this.hostLocalBlocks ++= hostLocalBlocksCurrentIteration
Review comment:
Would need to do something that for finding out the number of
hostLocalBlocks in the assertions before as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]