mridulm commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r477013033
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -61,4 +78,60 @@ public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}
+
+ /**
+ * Request the local disk directories for executors which are located at the
same host with
+ * the current BlockStoreClient(it can be ExternalBlockStoreClient or
NettyBlockTransferService).
+ *
+ *
+ * @param host the host of BlockManager or ExternalShuffleService. It's the
same with current
+ * BlockStoreClient.
+ * @param port the port of BlockManager or ExternalShuffleService.
+ * @param execIds a collection of executor Ids, which specifies the target
executors that we
+ * want to get their local directories. There could be
multiple executor Ids if
+ * BlockStoreClient is implemented by
ExternalBlockStoreClient since the request
+ * handler, ExternalShuffleService, can serve multiple
executors on the same node.
+ * Or, only one executor Id if BlockStoreClient is
implemented by
+ * NettyBlockTransferService.
+ * @param hostLocalDirsCompletable a CompletableFuture which contains a map
from executor Id
+ * to its local directories if the request
handler replies
+ * successfully. Otherwise, it contains a
specific error.
+ */
+ public void getHostLocalDirs(
+ String host,
+ int port,
+ String[] execIds,
+ CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
+ assert appId != null : "Called before init()";
+ GetLocalDirsForExecutors getLocalDirsMessage = new
GetLocalDirsForExecutors(appId, execIds);
+ try {
+ TransportClient client = clientFactory.createClient(host, port);
+ client.sendRpc(getLocalDirsMessage.toByteBuffer(), new
RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ try {
+ BlockTransferMessage msgObj =
BlockTransferMessage.Decoder.fromByteBuffer(response);
+ hostLocalDirsCompletable.complete(
+ ((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+ } catch (Throwable t) {
+ logger.warn("Error trying to get the host local dirs for " +
+ Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
+ hostLocalDirsCompletable.completeExceptionally(t);
+ } finally {
+ client.close();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.warn("Error trying to get the host local dirs for " +
+ Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
+ hostLocalDirsCompletable.completeExceptionally(t);
+ client.close();
Review comment:
Same as above, dont close `client`.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -61,4 +78,60 @@ public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}
+
+ /**
+ * Request the local disk directories for executors which are located at the
same host with
+ * the current BlockStoreClient(it can be ExternalBlockStoreClient or
NettyBlockTransferService).
+ *
+ *
+ * @param host the host of BlockManager or ExternalShuffleService. It's the
same with current
+ * BlockStoreClient.
+ * @param port the port of BlockManager or ExternalShuffleService.
+ * @param execIds a collection of executor Ids, which specifies the target
executors that we
+ * want to get their local directories. There could be
multiple executor Ids if
+ * BlockStoreClient is implemented by
ExternalBlockStoreClient since the request
+ * handler, ExternalShuffleService, can serve multiple
executors on the same node.
+ * Or, only one executor Id if BlockStoreClient is
implemented by
+ * NettyBlockTransferService.
+ * @param hostLocalDirsCompletable a CompletableFuture which contains a map
from executor Id
+ * to its local directories if the request
handler replies
+ * successfully. Otherwise, it contains a
specific error.
+ */
+ public void getHostLocalDirs(
+ String host,
+ int port,
+ String[] execIds,
+ CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
+ assert appId != null : "Called before init()";
+ GetLocalDirsForExecutors getLocalDirsMessage = new
GetLocalDirsForExecutors(appId, execIds);
+ try {
+ TransportClient client = clientFactory.createClient(host, port);
+ client.sendRpc(getLocalDirsMessage.toByteBuffer(), new
RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ try {
+ BlockTransferMessage msgObj =
BlockTransferMessage.Decoder.fromByteBuffer(response);
+ hostLocalDirsCompletable.complete(
+ ((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+ } catch (Throwable t) {
+ logger.warn("Error trying to get the host local dirs for " +
+ Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
+ hostLocalDirsCompletable.completeExceptionally(t);
+ } finally {
+ client.close();
Review comment:
`client` is cached - they should not be closed.
Other in-flight requests will fail.
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -466,42 +464,51 @@ final class ShuffleBlockFetcherIterator(
val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
hostLocalBlocksByExecutor
- .map { case (hostLocalBmId, bmInfos) =>
- (hostLocalBmId, bmInfos,
cachedDirsByExec.get(hostLocalBmId.executorId))
+ .map { case (hostLocalBmId, blockInfos) =>
+ (hostLocalBmId, blockInfos,
cachedDirsByExec.get(hostLocalBmId.executorId))
}.partition(_._3.isDefined)
- val bmId = blockManager.blockManagerId
val immutableHostLocalBlocksWithoutDirs =
- hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
- hostLocalBmId -> bmInfos
+ hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, blockInfos, _)
=>
+ hostLocalBmId -> blockInfos
}.toMap
if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
logDebug(s"Asynchronous fetching host-local blocks without cached
executors' dir: " +
s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
- val execIdsWithoutDirs =
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
- hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
- case Success(dirs) =>
- immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId,
blockInfos) =>
- blockInfos.takeWhile { case (blockId, _, mapIndex) =>
- fetchHostLocalBlock(
- blockId,
- mapIndex,
- dirs.get(hostLocalBmId.executorId),
- hostLocalBmId)
+ val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+ val host = blockManager.blockManagerId.host
+ val port = blockManager.externalShuffleServicePort
+ Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+ } else {
+ immutableHostLocalBlocksWithoutDirs.keys
+ .map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq
+ }
+
+ dirFetchRequests.foreach { case (host, port, bmIds) =>
+ hostLocalDirManager.getHostLocalDirs(host, port,
bmIds.map(_.executorId)) {
+ case Success(dirsByExecId) =>
Review comment:
Shouldn't this be exactly same as
`hostLocalBlocksWithCachedDirs.isDefined` block below ?
If yes, we should refactor them out
##########
File path:
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##########
@@ -113,7 +113,7 @@ private[spark] class NettyBlockTransferService(
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: DownloadFileManager): Unit = {
- logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
+ logger.trace(s"Fetch blocks from $host:$port (executor id $execId)")
Review comment:
Wrap it in `if (logger.isTraceEnabled)` to prevent unnecessary string
interpolation.
Please do this for other trace/debug in this file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]