otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r648705513
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -436,24 +487,51 @@ final class ShuffleBlockFetcherIterator(
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = Seq.empty[FetchBlockInfo]
-
while (iterator.hasNext) {
val (blockId, size, mapIndex) = iterator.next()
assertPositiveBlockSize(blockId, size)
curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex))
curRequestSize += size
- // For batch fetch, the actual block in flight should count for merged
block.
- val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >=
maxBlocksInFlightPerAddress
- if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
- curBlocks = createFetchRequests(curBlocks, address, isLast = false,
- collectedRemoteRequests)
- curRequestSize = curBlocks.map(_.size).sum
+ blockId match {
+ // Either all blocks are merged blocks, merged block chunks, or
original non-merged blocks.
+ // Based on these types, we decide to do batch fetch and create
FetchRequests with
+ // forMergedMetas set.
+ case ShuffleBlockChunkId(_, _, _) =>
+ if (curRequestSize >= targetRemoteRequestSize ||
+ curBlocks.size >= maxBlocksInFlightPerAddress) {
+ curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+ collectedRemoteRequests, enableBatchFetch = false)
+ curRequestSize = curBlocks.map(_.size).sum
+ }
+ case ShuffleBlockId(_, SHUFFLE_PUSH_MAP_ID, _) =>
+ if (curRequestSize >= targetRemoteRequestSize ||
+ curBlocks.size >= maxBlocksInFlightPerAddress) {
+ curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+ collectedRemoteRequests, enableBatchFetch = false,
forMergedMetas = true)
+ curRequestSize = curBlocks.map(_.size).sum
Review comment:
Offline review comment from @mridulm
> the fetch request is not fetching the actual data, but just the list of
chunks to be fetched for that merged block, right ? Given the size of that
response is so low, why are we considering size of the block for updating
curRequestSize there then
This is a good point so I will remove this check and found some other
discrepancies as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]