Github user jinxing64 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21212#discussion_r186293064
--- Diff:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
---
@@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator(
// at most maxBytesInFlight in order to limit the amount of data in
flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
- // Tracks total number of blocks (including zero sized blocks)
- var totalBlocks = 0
for ((address, blockInfos) <- blocksByAddress) {
- totalBlocks += blockInfos.size
if (address.executorId == blockManager.blockManagerId.executorId) {
- // Filter out zero-sized blocks
- localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+ blockInfos.find(_._2 <= 0) match {
+ case Some((blockId, size)) if size < 0 =>
+ throw new BlockException(blockId, "Negative block size " +
size)
+ case Some((blockId, size)) if size == 0 =>
+ throw new BlockException(blockId, "Zero-sized blocks should be
excluded.")
--- End diff --
Added another check for remote blocks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]