tgravescs commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r452276119



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator(
       logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
         s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
       val execIdsWithoutDirs = 
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-      hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-        case Success(dirs) =>
-          immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
-            blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-              fetchHostLocalBlock(
-                blockId,
-                mapIndex,
-                dirs.get(hostLocalBmId.executorId),
-                hostLocalBmId)
+
+      val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+        val host = blockManager.blockManagerId.host
+        val port = blockManager.externalShuffleServicePort
+        Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+      } else {
+        hostLocalBlocksByExecutor.keysIterator

Review comment:
       isn't this just essentially calculating 
immutableHostLocalBlocksWithoutDirs again? That contains the bmids of the ones 
missing from the cache.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -61,4 +63,17 @@ public MetricSet shuffleMetrics() {
     // Return an empty MetricSet by default.
     return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories, which are specified by 
DiskBlockManager, for the executors
+   * from the external shuffle service (when this is a 
ExternalBlockStoreClient) or BlockManager
+   * (when this is a NettyBlockTransferService). Note there's only one 
executor when this is a
+   * NettyBlockTransferService.

Review comment:
       might be nice to extend saying because we ask one specific executor at a 
time.

##########
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##########
@@ -194,6 +196,45 @@ private[spark] class NettyBlockTransferService(
     result.future
   }
 
+  override def getHostLocalDirs(
+      host: String,
+      port: Int,
+      execIds: Array[String],
+      hostLocalDirsCompletable: CompletableFuture[util.Map[String, 
Array[String]]]): Unit = {
+    val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds)
+    try {
+      val client = clientFactory.createClient(host, port)
+      client.sendRpc(getLocalDirsMessage.toByteBuffer, new 
RpcResponseCallback() {
+        override def onSuccess(response: ByteBuffer): Unit = {
+          try {
+            val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response)
+            hostLocalDirsCompletable.complete(
+              msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec)
+          } catch {
+            case t: Throwable =>
+              logWarning(s"Error trying to get the host local dirs for 
executor ${execIds.head}",
+                t.getCause)
+              hostLocalDirsCompletable.completeExceptionally(t)
+          } finally {
+            client.close()
+          }
+        }
+
+        override def onFailure(t: Throwable): Unit = {
+          logWarning(s"Error trying to get the host local dirs for executor 
${execIds.head}",
+            t.getCause)
+          hostLocalDirsCompletable.completeExceptionally(t)
+          client.close()
+        }
+      })
+    } catch {
+      case e: IOException =>

Review comment:
       is there a reason we only catch these 2?  You think any others are 
unknown, I'm assuming that will cause the executor to exit?

##########
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
##########
@@ -113,6 +113,14 @@ class NettyBlockRpcServer(
             s"when there is not sufficient space available to store the 
block.")
           responseContext.onFailure(exception)
         }
+
+      case req: GetLocalDirsForExecutors =>

Review comment:
       might be nice to rename req getLocalDirs to keep convention like other 
cases

##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator(
       logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
         s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
       val execIdsWithoutDirs = 
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-      hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-        case Success(dirs) =>
-          immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
-            blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-              fetchHostLocalBlock(
-                blockId,
-                mapIndex,
-                dirs.get(hostLocalBmId.executorId),
-                hostLocalBmId)
+
+      val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+        val host = blockManager.blockManagerId.host
+        val port = blockManager.externalShuffleServicePort
+        Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+      } else {
+        hostLocalBlocksByExecutor.keysIterator
+          .filter(exec => execIdsWithoutDirs.contains(exec.executorId))

Review comment:
       exec here is really mId, perhaps rename to be clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to