dongjoon-hyun commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r479696711
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -463,54 +466,73 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchHostLocalBlocks(hostLocalDirManager:
HostLocalDirManager): Unit = {
- val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
- val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
- hostLocalBlocksByExecutor
- .map { case (hostLocalBmId, bmInfos) =>
- (hostLocalBmId, bmInfos,
cachedDirsByExec.get(hostLocalBmId.executorId))
- }.partition(_._3.isDefined)
- val bmId = blockManager.blockManagerId
- val immutableHostLocalBlocksWithoutDirs =
- hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
- hostLocalBmId -> bmInfos
- }.toMap
- if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
+ val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs
+ val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = {
+ val (hasCache, noCache) = hostLocalBlocksByExecutor.partition { case
(hostLocalBmId, _) =>
+ cachedDirsByExec.contains(hostLocalBmId.executorId)
+ }
+ (hasCache.toMap, noCache.toMap)
+ }
+
+ if (hostLocalBlocksWithMissingDirs.nonEmpty) {
logDebug(s"Asynchronous fetching host-local blocks without cached
executors' dir: " +
- s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
- val execIdsWithoutDirs =
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
- hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
- case Success(dirs) =>
- immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId,
blockInfos) =>
- blockInfos.takeWhile { case (blockId, _, mapIndex) =>
- fetchHostLocalBlock(
- blockId,
- mapIndex,
- dirs.get(hostLocalBmId.executorId),
- hostLocalBmId)
- }
- }
- logDebug(s"Got host-local blocks (without cached executors' dir) in
" +
- s"${Utils.getUsedTimeNs(startTimeNs)}")
-
- case Failure(throwable) =>
- logError(s"Error occurred while fetching host local blocks",
throwable)
- val (hostLocalBmId, blockInfoSeq) =
immutableHostLocalBlocksWithoutDirs.head
- val (blockId, _, mapIndex) = blockInfoSeq.head
- results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId,
throwable))
+ s"${hostLocalBlocksWithMissingDirs.mkString(", ")}")
+
+ // If the external shuffle service is enabled, we'll fetch the local
directories for
+ // multiple executors from the external shuffle service, which located
at the same host
+ // with the executors, in once. Otherwise, we'll fetch the local
directories from those
+ // executors directly one by one. The fetch requests won't be too much
since one host is
+ // almost impossible to have many executors at the same time practically.
+ val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+ val host = blockManager.blockManagerId.host
+ val port = blockManager.externalShuffleServicePort
+ Seq((host, port, hostLocalBlocksWithMissingDirs.keys.toArray))
+ } else {
+ hostLocalBlocksWithMissingDirs.keys.map(bmId => (bmId.host, bmId.port,
Array(bmId))).toSeq
+ }
+
+ dirFetchRequests.foreach { case (host, port, bmIds) =>
+ hostLocalDirManager.getHostLocalDirs(host, port,
bmIds.map(_.executorId)) {
+ case Success(dirsByExecId) =>
+ fetchMultipleHostLocalBlocks(
+ hostLocalBlocksWithMissingDirs.filterKeys(bmIds.contains),
+ dirsByExecId,
+ cached = false)
+
+ case Failure(throwable) =>
+ logError("Error occurred while fetching host local blocks",
throwable)
+ val bmId = bmIds.head
+ val blockInfoSeq = hostLocalBlocksWithMissingDirs(bmId)
+ val (blockId, _, mapIndex) = blockInfoSeq.head
+ results.put(FailureFetchResult(blockId, mapIndex, bmId, throwable))
+ }
}
}
+
if (hostLocalBlocksWithCachedDirs.nonEmpty) {
logDebug(s"Synchronous fetching host-local blocks with cached executors'
dir: " +
s"${hostLocalBlocksWithCachedDirs.mkString(", ")}")
- hostLocalBlocksWithCachedDirs.foreach { case (_, blockInfos, localDirs)
=>
- blockInfos.foreach { case (blockId, _, mapIndex) =>
- if (!fetchHostLocalBlock(blockId, mapIndex, localDirs.get, bmId)) {
- return
- }
- }
+ fetchMultipleHostLocalBlocks(hostLocalBlocksWithCachedDirs,
cachedDirsByExec, cached = true)
+ }
+ }
+
+ private def fetchMultipleHostLocalBlocks(
+ bmIdToBlocks: Map[BlockManagerId, Seq[(BlockId, Long, Int)]],
+ localDirsByExecId: Map[String, Array[String]],
+ cached: Boolean)
+ : Unit = {
+ // We use `forall` because once there's a block fetch is failed,
`fetchHostLocalBlock` will put
+ // a `FailureFetchResult` immediately to the `results`. So there's no
reason to fetch the
+ // remaining blocks.
+ val allFetchSucceed = bmIdToBlocks.forall { case (bmId, blockInfos) =>
Review comment:
`allFetchSucceed` -> `allFetchSucceeded` because we usually use
`XXXSucceeded`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]