otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r648705513



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -436,24 +487,51 @@ final class ShuffleBlockFetcherIterator(
     val iterator = blockInfos.iterator
     var curRequestSize = 0L
     var curBlocks = Seq.empty[FetchBlockInfo]
-
     while (iterator.hasNext) {
       val (blockId, size, mapIndex) = iterator.next()
       assertPositiveBlockSize(blockId, size)
       curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex))
       curRequestSize += size
-      // For batch fetch, the actual block in flight should count for merged 
block.
-      val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= 
maxBlocksInFlightPerAddress
-      if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
-        curBlocks = createFetchRequests(curBlocks, address, isLast = false,
-          collectedRemoteRequests)
-        curRequestSize = curBlocks.map(_.size).sum
+      blockId match {
+        // Either all blocks are merged blocks, merged block chunks, or 
original non-merged blocks.
+        // Based on these types, we decide to do batch fetch and create 
FetchRequests with
+        // forMergedMetas set.
+        case ShuffleBlockChunkId(_, _, _) =>
+          if (curRequestSize >= targetRemoteRequestSize ||
+            curBlocks.size >= maxBlocksInFlightPerAddress) {
+            curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+              collectedRemoteRequests, enableBatchFetch = false)
+            curRequestSize = curBlocks.map(_.size).sum
+          }
+        case ShuffleBlockId(_, SHUFFLE_PUSH_MAP_ID, _) =>
+          if (curRequestSize >= targetRemoteRequestSize ||
+              curBlocks.size >= maxBlocksInFlightPerAddress) {
+            curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+              collectedRemoteRequests, enableBatchFetch = false, 
forMergedMetas = true)
+            curRequestSize = curBlocks.map(_.size).sum

Review comment:
       Offline review comment from @mridulm
   > the fetch request is not fetching the actual data, but just the list of 
chunks to be fetched for that merged block, right ? Given the size of that 
response is so low, why are we considering size of the block for updating 
curRequestSize there then 
   
   This is a good point so I will remove this check and found some other 
discrepancies as well. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to