vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346953940
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##########
 @@ -113,6 +114,41 @@ private[spark] class ByteBufferBlockData(
 
 }
 
+private[spark] class HostLocalDirManager(
+    futureExecutionContext: ExecutionContext,
+    cacheSize: Int,
+    externalBlockStoreClient: ExternalBlockStoreClient,
+    host: String,
+    externalShuffleServicePort: Int) extends Logging {
+
+  private val executorIdToLocalDirsCache =
+    CacheBuilder
+      .newBuilder()
+      .maximumSize(cacheSize)
+      .build[String, Array[String]]()
+
+  private[spark] def getCachedHostLocalDirs()
+    : scala.collection.Map[String, Array[String]] = 
executorIdToLocalDirsCache.synchronized {
+    import scala.collection.JavaConverters._
+    return executorIdToLocalDirsCache.asMap().asScala
+  }
+
+  private[spark] def getHostLocalDirs(
+      executorIds: Array[String],
+      hostLocalDirsCompletable: CompletableFuture[java.util.Map[String, 
Array[String]]]): Unit = {
+    Future {
 
 Review comment:
   You're mixing the Java API with the Scala API here and it becomes a little 
weird to follow what's going on where.
   
   For example, the `thenAccept` callback here will always run in the global 
execution context (because that's what `futureExecutionContext` is in your 
code). It also means the RPC will block one thread in that context, since `get` 
is blocking. That's not good.
   
   On the call site, though, it depends. If the RPC is ultra fast, the 
`whenComplete` will run in the same thread as the caller; or it will run in the 
RPC thread completing the fiuture, which will be different that the thread 
processing the `thenAccept`.
   
   I think instead you should be hiding these details from the caller. Have the 
caller provide a callback that will receive a `Try[java.util.Map[String, 
Array[String]]` when done, and have it always be executed asynchronously in the 
RPC thread completing the call. (That means setting up the `CompletableFuture` 
internally in this method before sending the RPC.)
   
   This avoid mixing the Java and Scala futures, and makes it easier to 
understand what's going on in what thread. (e.g., the caller will always be 
asynchronous)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to