vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346953940
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -113,6 +114,41 @@ private[spark] class ByteBufferBlockData(
}
+private[spark] class HostLocalDirManager(
+ futureExecutionContext: ExecutionContext,
+ cacheSize: Int,
+ externalBlockStoreClient: ExternalBlockStoreClient,
+ host: String,
+ externalShuffleServicePort: Int) extends Logging {
+
+ private val executorIdToLocalDirsCache =
+ CacheBuilder
+ .newBuilder()
+ .maximumSize(cacheSize)
+ .build[String, Array[String]]()
+
+ private[spark] def getCachedHostLocalDirs()
+ : scala.collection.Map[String, Array[String]] =
executorIdToLocalDirsCache.synchronized {
+ import scala.collection.JavaConverters._
+ return executorIdToLocalDirsCache.asMap().asScala
+ }
+
+ private[spark] def getHostLocalDirs(
+ executorIds: Array[String],
+ hostLocalDirsCompletable: CompletableFuture[java.util.Map[String,
Array[String]]]): Unit = {
+ Future {
Review comment:
You're mixing the Java API with the Scala API here and it becomes a little
weird to follow what's going on where.
For example, the `thenAccept` callback here will always run in the global
execution context (because that's what `futureExecutionContext` is in your
code). It also means the RPC will block one thread in that context, since `get`
is blocking. That's not good.
On the call site, though, it depends. If the RPC is ultra fast, the
`whenComplete` will run in the same thread as the caller; or it will run in the
RPC thread completing the fiuture, which will be different that the thread
processing the `thenAccept`.
I think instead you should be hiding these details from the caller. Have the
caller provide a callback that will receive a `Try[java.util.Map[String,
Array[String]]` when done, and have it always be executed asynchronously in the
RPC thread completing the call. (That means setting up the `CompletableFuture`
internally in this method before sending the RPC.)
This avoid mixing the Java and Scala futures, and makes it easier to
understand what's going on in what thread. (e.g., the caller will always be
asynchronous)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]