otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r655034874
########## File path: core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.MapOutputTracker +import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID +import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} +import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER +import org.apache.spark.storage.ShuffleBlockFetcherIterator._ + +/** + * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based + * functionality to fetch merged block meta and merged shuffle block chunks. + */ +private class PushBasedFetchHelper( + private val iterator: ShuffleBlockFetcherIterator, + private val shuffleClient: BlockStoreClient, + private val blockManager: BlockManager, + private val mapOutputTracker: MapOutputTracker) extends Logging { + + private[this] val startTimeNs = System.nanoTime() + + private[this] val localShuffleMergerBlockMgrId = BlockManagerId( + SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host, + blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo) + + /** + * A map for storing merged block shuffle chunk bitmap. This is a concurrent hashmap because it + * can be modified by both the task thread and the netty thread. + */ + private[this] val chunksMetaMap = new ConcurrentHashMap[ShuffleBlockChunkId, RoaringBitmap]() + + /** + * Returns true if the address is for a push-merged block. + */ + def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = { + SHUFFLE_MERGER_IDENTIFIER == address.executorId + } + + /** + * Returns true if the address is of a remote merged block. + */ + def isMergedBlockAddressRemote(address: BlockManagerId): Boolean = { + assert(isMergedShuffleBlockAddress(address)) + address.host != blockManager.blockManagerId.host + } + + /** + * Returns true if the address if of merged local block. false otherwise. + */ + def isMergedLocal(address: BlockManagerId): Boolean = { + isMergedShuffleBlockAddress(address) && address.host == blockManager.blockManagerId.host + } + + /** + * This is executed by the task thread when the `iterator.next()` is invoked and the iterator + * processes a response of type [[ShuffleBlockFetcherIterator.SuccessFetchResult]]. + * + * @param blockId shuffle block chunk id. + */ + def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = { + chunksMetaMap.get(blockId).getCardinality + } + + /** + * This is executed by the task thread when the `iterator.next()` is invoked and the iterator + * processes a response of type [[ShuffleBlockFetcherIterator.SuccessFetchResult]]. + * + * @param blockId shuffle block chunk id. + */ + def removeChunk(blockId: ShuffleBlockChunkId): Unit = { + chunksMetaMap.remove(blockId) + } + + /** + * This is executed by the task thread when the `iterator.next()` is invoked and the iterator + * processes a response of type [[ShuffleBlockFetcherIterator.MergedMetaFetchResult]]. + * + * @param shuffleId shuffle id. + * @param reduceId reduce id. + * @param blockSize size of the merged block. + * @param numChunks number of chunks in the merged block. + * @param bitmaps per chunk bitmap, where each bitmap contains all the mapIds that are merged + * to that chunk. + * @return shuffle chunks to fetch. + */ + def createChunkBlockInfosFromMetaResponse( + shuffleId: Int, + reduceId: Int, + blockSize: Long, + numChunks: Int, + bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = { + val approxChunkSize = blockSize / numChunks + val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]() + for (i <- 0 until numChunks) { + val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i) + chunksMetaMap.put(blockChunkId, bitmaps(i)) + logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize") + blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID)) + } + blocksToFetch + } + + /** + * This is executed by the task thread when the iterator is initialized and only if it has + * push-merged blocks for which it needs to fetch the metadata. + * + * @param req [[ShuffleBlockFetcherIterator.FetchRequest]] that only contains requests to fetch + * metadata of merged blocks. + */ + def sendFetchMergedStatusRequest(req: FetchRequest): Unit = { + val sizeMap = req.blocks.map { + case FetchBlockInfo(blockId, size, _) => + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + ((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size) + }.toMap + val address = req.address + val mergedBlocksMetaListener = new MergedBlocksMetaListener { + override def onSuccess(shuffleId: Int, reduceId: Int, meta: MergedBlockMeta): Unit = { + logInfo(s"Received the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}") + try { + iterator.addToResultsQueue(MergedMetaFetchResult(shuffleId, reduceId, + sizeMap((shuffleId, reduceId)), meta.getNumChunks, meta.readChunkBitmaps(), address)) + } catch { + case exception: Exception => + logError(s"Failed to parse the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}", exception) + iterator.addToResultsQueue( + MergedMetaFailedFetchResult(shuffleId, reduceId, address)) + } + } + + override def onFailure(shuffleId: Int, reduceId: Int, exception: Throwable): Unit = { + logError(s"Failed to get the meta of merged block for ($shuffleId, $reduceId) " + + s"from ${req.address.host}:${req.address.port}", exception) + iterator.addToResultsQueue(MergedMetaFailedFetchResult(shuffleId, reduceId, address)) + } + } + req.blocks.foreach { block => + val shuffleBlockId = block.blockId.asInstanceOf[ShuffleBlockId] + shuffleClient.getMergedBlockMeta(address.host, address.port, shuffleBlockId.shuffleId, + shuffleBlockId.reduceId, mergedBlocksMetaListener) + } + } + + /** + * This is executed by the task thread when the iterator is initialized. It fetches all the + * outstanding merged local blocks. + * @param mergedLocalBlocks set of identified merged local blocks. + */ + def fetchAllMergedLocalBlocks( + mergedLocalBlocks: mutable.LinkedHashSet[BlockId]): Unit = { + if (mergedLocalBlocks.nonEmpty) { + blockManager.hostLocalDirManager.foreach(fetchMergedLocalBlocks(_, mergedLocalBlocks)) + } + } + + /** + * Fetch the merged blocks dirs if they are not in the cache and eventually fetch merged local + * blocks. + */ + private def fetchMergedLocalBlocks( + hostLocalDirManager: HostLocalDirManager, + mergedLocalBlocks: mutable.LinkedHashSet[BlockId]): Unit = { + val cachedMergerDirs = hostLocalDirManager.getCachedHostLocalDirs.get( + SHUFFLE_MERGER_IDENTIFIER) + if (cachedMergerDirs.isDefined) { + logDebug(s"Fetching local merged blocks with cached executors dir: " + + s"${cachedMergerDirs.get.mkString(", ")}") + mergedLocalBlocks.foreach(blockId => + fetchMergedLocalBlock(blockId, cachedMergerDirs.get, localShuffleMergerBlockMgrId)) + } else { + logDebug(s"Asynchronous fetching local merged blocks without cached executors dir") + hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host, + localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) { + case Success(dirs) => + mergedLocalBlocks.takeWhile { + blockId => + logDebug(s"Successfully fetched local dirs: " + + s"${dirs.get(SHUFFLE_MERGER_IDENTIFIER).mkString(", ")}") + fetchMergedLocalBlock(blockId, dirs(SHUFFLE_MERGER_IDENTIFIER), + localShuffleMergerBlockMgrId) + } + logDebug(s"Got local merged blocks (without cached executors' dir) in " + + s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms") + case Failure(throwable) => + // If we see an exception with getting the local dirs for local merged blocks, + // we fallback to fetch the original unmerged blocks. We do not report block fetch + // failure. + logWarning(s"Error occurred while getting the local dirs for local merged " + + s"blocks: ${mergedLocalBlocks.mkString(", ")}. Fetch the original blocks instead", + throwable) + mergedLocalBlocks.foreach( + blockId => iterator.addToResultsQueue(FallbackOnMergedFailureFetchResult( + blockId, localShuffleMergerBlockMgrId, 0, isNetworkReqDone = false)) + ) + } + } + } + + /** + * Fetch a single local merged block generated. This can also be executed by the task thread as + * well as the netty thread. + * @param blockId ShuffleBlockId to be fetched + * @param localDirs Local directories where the merged shuffle files are stored + * @param blockManagerId BlockManagerId + * @return Boolean represents successful or failed fetch + */ + private[this] def fetchMergedLocalBlock( + blockId: BlockId, + localDirs: Array[String], + blockManagerId: BlockManagerId): Boolean = { + try { + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + val chunksMeta = blockManager.getLocalMergedBlockMeta(shuffleBlockId, localDirs) + .readChunkBitmaps() + // Fetch local merged shuffle block data as multiple chunks + val bufs: Seq[ManagedBuffer] = blockManager.getLocalMergedBlockData(shuffleBlockId, localDirs) + // Update total number of blocks to fetch, reflecting the multiple local chunks + iterator.incrementNumBlocksToFetch(bufs.size - 1) Review comment: I have made this change but now this happens when the iterator processes ` PushMergedLocalMetaFetchResult` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
