[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480847421 ## File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ## @@ -113,6 +113,21 @@ class NettyBlockRpcServer( s"when there is not sufficient space available to store the block.") responseContext.onFailure(exception) } + + case getLocalDirs: GetLocalDirsForExecutors => +val isIncorrectAppId = getLocalDirs.appId != appId +val execNum = getLocalDirs.execIds.length +if (isIncorrectAppId || execNum != 1) { + val errorMsg = "Invalid GetLocalDirsForExecutors request: " + +s"${if (isIncorrectAppId) s"incorrect application id: ${getLocalDirs.appId};"}" + +s"${if (execNum != 1) s"incorrect executor number: $execNum (expected 1);"}" + responseContext.onFailure(new IllegalStateException(errorMsg)) +} else { + val execId = getLocalDirs.execIds.head Review comment: Make sense to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480847229 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ## @@ -212,7 +211,7 @@ private[spark] class BlockManager( private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory - private val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf) + private[spark] val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf) Review comment: It's required by `val port = blockManager.externalShuffleServicePort` within `ShuffleBlockFetcherIterator`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480178710 ## File path: core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.NettyBlockTransferService + +/** + * This test suite is used to test host local shuffle reading with external shuffle service disabled + */ +class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { Review comment: Thanks. I already added the checking for metrics. Let me try to add other assertions to catch the expected behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480177199 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1415,10 +1415,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " + +"blocks requested from those block managers which are running on the same host are " + +"read from the disk directly instead of being fetched as remote blocks over the " + +"network. Note that for k8s workloads, this only works when nodes are using " + +"non-isolated container storage.") Review comment: Thank you four your clarification. If that's the truth, I think we should point it out at doc. cc @dongjoon-hyun @holdenk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480173840 ## File path: core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.NettyBlockTransferService + +/** + * This test suite is used to test host local shuffle reading with external shuffle service disabled + */ +class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { Review comment: hmm...I'm not sure whether @dongjoon-hyun was mentioning those two asserts. But I can try to make them as separate unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480157384 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1415,10 +1415,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " + +"blocks requested from those block managers which are running on the same host are " + +"read from the disk directly instead of being fetched as remote blocks over the " + +"network. Note that for k8s workloads, this only works when nodes are using " + +"non-isolated container storage.") Review comment: > Currently this is done by using the some host in the blockmanager ID which works only for YARN and standalone mode, is not it? IIUC, from @holdenk 's previous comment and @dongjoon-hyun 's comment, it should also work for Mesos/K8s when they're using the non-isolated container. > A question for the future: do you have a plan to introduce block manager grouping based on shared storage? I don't. To be honest, I'm not familiar with the containerized resource manager. I'm also not sure what the plan you're meaning here. Is it only needed for the containerized resource manager? ## File path: core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.NettyBlockTransferService + +/** + * This test suite is used to test host local shuffle reading with external shuffle service disabled + */ +class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { Review comment: Sorry, which two asserts are you referring to? Are these two asserts in the new test: ```scala // Spark should read the shuffle data locally from the cached directories on the same host, // so there's no remote fetching at all. assert(localBytesRead.sum > 0) assert(remoteBytesRead.sum === 0) ``` If they are, I actually think that checking `localBytesRead`/`remoteBytesRead` is equal to `localBlocksFetched`/`remoteBlocksFetched` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480095723 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java ## @@ -61,4 +78,56 @@ public MetricSet shuffleMetrics() { // Return an empty MetricSet by default. return () -> Collections.emptyMap(); } + + /** + * Request the local disk directories for executors which are located at the same host with + * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). + * + * @param host the host of BlockManager or ExternalShuffleService. It's the same with current + * BlockStoreClient. Review comment: It's a pre-condition which should be guaranteed by the caller. I've changed it to `should be`. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480095860 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java ## @@ -18,15 +18,32 @@ package org.apache.spark.network.shuffle; import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import com.codahale.metrics.MetricSet; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.client.TransportClientFactory; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors; +import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; Review comment: Thank you for catching this! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480095206 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java ## @@ -61,4 +78,56 @@ public MetricSet shuffleMetrics() { // Return an empty MetricSet by default. return () -> Collections.emptyMap(); } + + /** + * Request the local disk directories for executors which are located at the same host with + * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). + * + * @param host the host of BlockManager or ExternalShuffleService. It's the same with current + * BlockStoreClient. + * @param port the port of BlockManager or ExternalShuffleService. + * @param execIds a collection of executor Ids, which specifies the target executors that we + *want to get their local directories. There could be multiple executor Ids if + *BlockStoreClient is implemented by ExternalBlockStoreClient since the request + *handler, ExternalShuffleService, can serve multiple executors on the same node. + *Or, only one executor Id if BlockStoreClient is implemented by + *NettyBlockTransferService. + * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id + * to its local directories if the request handler replies + * successfully. Otherwise, it contains a specific error. + */ + public void getHostLocalDirs( + String host, + int port, + String[] execIds, + CompletableFuture> hostLocalDirsCompletable) { +assert appId != null : "Called before init()"; Review comment: Sounds reasonable! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480094857 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ## @@ -120,34 +120,33 @@ private[spark] class ByteBufferBlockData( private[spark] class HostLocalDirManager( futureExecutionContext: ExecutionContext, cacheSize: Int, -externalBlockStoreClient: ExternalBlockStoreClient, -host: String, -externalShuffleServicePort: Int) extends Logging { +blockStoreClient: BlockStoreClient) extends Logging { private val executorIdToLocalDirsCache = CacheBuilder .newBuilder() .maximumSize(cacheSize) .build[String, Array[String]]() - private[spark] def getCachedHostLocalDirs() - : scala.collection.Map[String, Array[String]] = executorIdToLocalDirsCache.synchronized { -import scala.collection.JavaConverters._ -return executorIdToLocalDirsCache.asMap().asScala - } + private[spark] def getCachedHostLocalDirs: Map[String, Array[String]] = + executorIdToLocalDirsCache.synchronized { + executorIdToLocalDirsCache.asMap().asScala.toMap + } private[spark] def getHostLocalDirs( + host: String, + port: Int, executorIds: Array[String])( - callback: Try[java.util.Map[String, Array[String]]] => Unit): Unit = { + callback: Try[Map[String, Array[String]]] => Unit): Unit = { Review comment: It's required by `fetchMultipleHostLocalBlocks`. Actually, we could also do the Jave to Scala map conversion before calling `fetchMultipleHostLocalBlocks` but leaving `Try[java.util.Map[String, Array[String]]] => Unit` unchanged. But I decided to do the conversion here just because this class already imported `scala.collection.JavaConverters._`. It has no big difference to do the conversion here or there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480090760 ## File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ## @@ -113,6 +113,15 @@ class NettyBlockRpcServer( s"when there is not sufficient space available to store the block.") responseContext.onFailure(exception) } + + case getLocalDirs: GetLocalDirsForExecutors => +assert(getLocalDirs.appId == appId) +assert(getLocalDirs.execIds.length == 1) Review comment: This's a good point. I've changed it to reply to the sender with the error if the request fails assertion. Thus, the sender could handle the error as fetch failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480089706 ## File path: core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.scalatest.matchers.must.Matchers +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.NettyBlockTransferService + +/** + * This test suite is used to test host local shuffle reading with external shuffle service disabled + */ +class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { Review comment: I've combined the two host-local shuffle reading tests(with external shuffle service enabled or disabled) into the [HostLocalShuffleReadingSuite](https://github.com/apache/spark/blob/6b97be552b1d5a78f476c4a97d794c567222d9bf/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala). Thus, the host-local shuffle reading feature can be tested centralized. Does it look okay to you? @dongjoon-hyun @attilapiros This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r480086501 ## File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala ## @@ -66,16 +66,24 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT transfer } + private def createMockBlockManager(): BlockManager = { +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-local-host", 1) +doReturn(localBmId).when(blockManager).blockManagerId +// By default, the mock BlockManager returns None for hostLocalDirManager. One could +// still use initHostLocalDirManager() to specify a custom hostLocalDirManager. Review comment: tests like: `successful 3 local + 4 host local + 2 remote reads` `fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r478179838 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -463,54 +466,73 @@ final class ShuffleBlockFetcherIterator( * track in-memory are the ManagedBuffer references themselves. */ private[this] def fetchHostLocalBlocks(hostLocalDirManager: HostLocalDirManager): Unit = { -val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs() -val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = - hostLocalBlocksByExecutor -.map { case (hostLocalBmId, bmInfos) => - (hostLocalBmId, bmInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) -}.partition(_._3.isDefined) -val bmId = blockManager.blockManagerId -val immutableHostLocalBlocksWithoutDirs = - hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) => -hostLocalBmId -> bmInfos - }.toMap -if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { +val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs +val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = { + val (hasCache, noCache) = hostLocalBlocksByExecutor.partition { case (hostLocalBmId, _) => +cachedDirsByExec.contains(hostLocalBmId.executorId) + } + (hasCache.toMap, noCache.toMap) +} + +if (hostLocalBlocksWithMissingDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + -s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") - val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) { -case Success(dirs) => - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => -blockInfos.takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( -blockId, -mapIndex, -dirs.get(hostLocalBmId.executorId), -hostLocalBmId) -} - } - logDebug(s"Got host-local blocks (without cached executors' dir) in " + -s"${Utils.getUsedTimeNs(startTimeNs)}") - -case Failure(throwable) => - logError(s"Error occurred while fetching host local blocks", throwable) - val (hostLocalBmId, blockInfoSeq) = immutableHostLocalBlocksWithoutDirs.head - val (blockId, _, mapIndex) = blockInfoSeq.head - results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, throwable)) +s"${hostLocalBlocksWithMissingDirs.mkString(", ")}") + + // If the external shuffle service is enabled, we'll fetch the local directories for + // multiple executors from the external shuffle service, which located at the same host + // with the executors, in once. Otherwise, we'll fetch the local directories from those + // executors directly one by one. The fetch requests won't be too much since one host is + // almost impossible to have many executors at the same time practically. + val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { +val host = blockManager.blockManagerId.host +val port = blockManager.externalShuffleServicePort +Seq((host, port, hostLocalBlocksWithMissingDirs.keys.toArray)) + } else { +hostLocalBlocksWithMissingDirs.keys.map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq + } + + dirFetchRequests.foreach { case (host, port, bmIds) => +hostLocalDirManager.getHostLocalDirs(host, port, bmIds.map(_.executorId)) { + case Success(dirsByExecId) => +fetchMultipleHostLocalBlocks( + hostLocalBlocksWithMissingDirs.filterKeys(bmIds.contains), + dirsByExecId, + cached = false) + + case Failure(throwable) => +logError("Error occurred while fetching host local blocks", throwable) +val bmId = bmIds.head +val blockInfoSeq = hostLocalBlocksWithMissingDirs(bmId) +val (blockId, _, mapIndex) = blockInfoSeq.head +results.put(FailureFetchResult(blockId, mapIndex, bmId, throwable)) +} } } + if (hostLocalBlocksWithCachedDirs.nonEmpty) { logDebug(s"Synchronous fetching host-local blocks with cached executors' dir: " + s"${hostLocalBlocksWithCachedDirs.mkString(", ")}") - hostLocalBlocksWithCachedDirs.foreach { case (_, blockInfos, localDirs) => -blockInfos.foreach { case (blockId, _, mapIndex) => - if (!fetchHostLocalBlock(blockId, mapIndex, localDirs.get, bmId)) { Review comment: @attilapiros This looks like a bug before. The `bmId` is for the current executor but blocks can be other executors on the
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r478175273 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -466,42 +464,51 @@ final class ShuffleBlockFetcherIterator( val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs() val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = hostLocalBlocksByExecutor -.map { case (hostLocalBmId, bmInfos) => - (hostLocalBmId, bmInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) +.map { case (hostLocalBmId, blockInfos) => + (hostLocalBmId, blockInfos, cachedDirsByExec.get(hostLocalBmId.executorId)) }.partition(_._3.isDefined) -val bmId = blockManager.blockManagerId val immutableHostLocalBlocksWithoutDirs = - hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) => -hostLocalBmId -> bmInfos + hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, blockInfos, _) => +hostLocalBmId -> blockInfos }.toMap if (immutableHostLocalBlocksWithoutDirs.nonEmpty) { logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") - val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) { -case Success(dirs) => - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => -blockInfos.takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( -blockId, -mapIndex, -dirs.get(hostLocalBmId.executorId), -hostLocalBmId) + val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { +val host = blockManager.blockManagerId.host +val port = blockManager.externalShuffleServicePort +Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) + } else { +immutableHostLocalBlocksWithoutDirs.keys + .map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq + } + + dirFetchRequests.foreach { case (host, port, bmIds) => +hostLocalDirManager.getHostLocalDirs(host, port, bmIds.map(_.executorId)) { + case Success(dirsByExecId) => Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r459902032 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java ## @@ -61,4 +78,62 @@ public MetricSet shuffleMetrics() { // Return an empty MetricSet by default. return () -> Collections.emptyMap(); } + + /** + * Request the local disk directories for executors which are located at the same host with + * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). + * + * + * @param host the host of BlockManager or ExternalShuffleService. It's the same with current + * BlockStoreClient. + * @param port the port of BlockManager or ExternalShuffleService. + * @param execIds a collection of executor Ids, which specifies the target executors that we + *want to get their local directories. There could be multiple executor Ids if + *BlockStoreClient is implemented by ExternalBlockStoreClient since the request + *handler, ExternalShuffleService, can serve multiple executors on the same node. + *Or, only one executor Id if BlockStoreClient is implemented by + *NettyBlockTransferService. + * @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id to its + * local directories if the request handler replies successfully. + * Otherwise, it contains a specific error. + */ + public void getHostLocalDirs( + String host, + int port, + String[] execIds, + CompletableFuture> hostLocalDirsCompletable) { +assert appId != null : "Called before init()"; +GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds); +try { + TransportClient client = clientFactory.createClient(host, port); + client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() { +@Override +public void onSuccess(ByteBuffer response) { + try { +BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); +hostLocalDirsCompletable.complete( +((LocalDirsForExecutors) msgObj).getLocalDirsByExec()); + } catch (Throwable t) { +logger.warn("Error trying to get the host local dirs for " + +Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service", Review comment: Thanks for catching this. I just removed "via external shuffle service" since the `logger` would print the current class name. Do you think it's ok? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r459897266 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ## @@ -492,20 +494,26 @@ private[spark] class BlockManager( registerWithExternalShuffleServer() } -hostLocalDirManager = - if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && - !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { -externalBlockStoreClient.map { blockStoreClient => - new HostLocalDirManager( -futureExecutionContext, -conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), -blockStoreClient, -blockManagerId.host, -externalShuffleServicePort) -} +hostLocalDirManager = { + val canUseHostLocalReading = conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && +!conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) + val externalShuffleServiceEnabled = externalBlockStoreClient.isDefined + val dynamicAllocationDisabled = !conf.get(config.DYN_ALLOCATION_ENABLED) + val dynamicAllocationEnabledWithShuffleTacking = conf.get(config.DYN_ALLOCATION_ENABLED) && +conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) Review comment: Thanks for catching it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r459897035 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1415,10 +1415,16 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc("When enabled, shuffle blocks requested from those block managers which are running " + +"on the same host are read from the disk directly instead of being fetched as remote " + +"blocks over the network. Note that for k8s workloads, this only works when nodes are " + +"using non-isolated container storage." + +s"To enable the feature, one should disable ${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key} first." + +" And make sure that one of the following requirements are satisfied:\n" + +s"1. external shuffle service is enabled (${SHUFFLE_SERVICE_ENABLED.key});" + +s"2. dynamic allocation is disabled (${DYN_ALLOCATION_ENABLED.key});" + +s"3. dynamic allocation is enabled with shuffle tracking " + +s"(${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED});") Review comment: Oh yeah... we'd cover all the cases after this PR. I'll remove the requirements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r459264839 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1415,10 +1415,16 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc("When enabled, shuffle blocks requested from those block managers which are running " + +"on the same host are read from the disk directly instead of being fetched as remote " + +"blocks over the network. Note that for k8s workloads, this only works when nodes are " + +"using non-isolated container storage." + +s"To enable the feature, one should disable ${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key} first." + +" And make sure that one of the following requirements are satisfied:\n" + +s"1. external shuffle service is enabled (${SHUFFLE_SERVICE_ENABLED.key});" + +s"2. dynamic allocation is disabled (${DYN_ALLOCATION_ENABLED.key});" + +s"3. dynamic allocation is enabled with shuffle tracking " + +s"(${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED});") Review comment: Please take a look at the updated document regarding the dynamic allocation. cc: @holdenk @attilapiros This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r458103470 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1391,10 +1391,12 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + +s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + +s" is disabled), shuffle blocks requested from those block managers which are running on " + Review comment: I mean, it *can* work with this feature enabled(please see my P.S. comment above). And I will update the document about dynamic allocation to: 1. not allow when `spark.dynamicAllocation.enabled=true` && `spark.dynamicAllocation.shuffleTracking.enabled=false` 2. allow when `spark.dynamicAllocation.enabled=true` && `spark.dynamicAllocation.shuffleTracking.enabled=true` also cc @attilapiros Does it make sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r458097873 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1391,10 +1391,12 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + +s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + +s" is disabled), shuffle blocks requested from those block managers which are running on " + Review comment: > As of Spark 3 we no longer require dynamic allocation to have a shuffle service. This exactly what I mentioned in P.S.. I can update the documentation according to this feature. But please also note dynamic allocation without external shuffle service is still an experimental feature disabled by default. And it has a main problem that the user needs to config when to delete shuffle files while most common users have no idea about this. And by default, shuffle files will not be removed until GC happens at the driver side. It also means executors won't come and go more frequently than dynamic allocation with shuffle service. Therefore, I think we were discussing a more general problem above when using dynamic allocation with shuffle service. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r457896569 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1391,10 +1391,12 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + +s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + +s" is disabled), shuffle blocks requested from those block managers which are running on " + Review comment: Yes, it prevents the case where executors could come and go in dynamic allocation. Also, I think it's still different from executor loss error. Because executor loss is an abnormal case which out of control of Spark while dynamic allocation is under control. And executor shutdown in dynamic allocation happens more frequently compares to executor loss. I think we should try our best to avoid shuffle fetch failure since its penalty is not trivial, especially when we can avoid it. Besides, for the case of dynamic allocation enabled, users could already use external shuffle service. Therefore, I can't think of a strong reason to mix these two branches. P.S. we could probably allow dynamic allocation here if `spark.dynamicAllocation.shuffleTracking.enabled` is also enabled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r457876717 ## File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ## @@ -194,6 +196,45 @@ private[spark] class NettyBlockTransferService( result.future } + override def getHostLocalDirs( + host: String, + port: Int, + execIds: Array[String], + hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]): Unit = { +val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds) +try { + val client = clientFactory.createClient(host, port) + client.sendRpc(getLocalDirsMessage.toByteBuffer, new RpcResponseCallback() { +override def onSuccess(response: ByteBuffer): Unit = { + try { +val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response) +hostLocalDirsCompletable.complete( + msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec) + } catch { +case t: Throwable => + logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", +t.getCause) + hostLocalDirsCompletable.completeExceptionally(t) + } finally { +client.close() + } +} + +override def onFailure(t: Throwable): Unit = { + logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", +t.getCause) + hostLocalDirsCompletable.completeExceptionally(t) + client.close() +} + }) +} catch { + case e: IOException => Review comment: Yes. Good idea. I've did the refactor. Please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r457876162 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java ## @@ -61,4 +63,17 @@ public MetricSet shuffleMetrics() { // Return an empty MetricSet by default. return () -> Collections.emptyMap(); } + + /** + * Request the local disk directories, which are specified by DiskBlockManager, for the executors + * from the external shuffle service (when this is a ExternalBlockStoreClient) or BlockManager + * (when this is a NettyBlockTransferService). Note there's only one executor when this is a + * NettyBlockTransferService because we ask one specific executor at a time. Review comment: I added the check to ensure it's the only one executor id but didn't check its equality with blockManager's executor id. Because we only have `BlockDataManager` in `NettyBlockRpcServer` which does not expose executor id. I am still wondering whether it's worthwhile to expose it for the sanity check purpose. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r452705081 ## File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ## @@ -194,6 +196,45 @@ private[spark] class NettyBlockTransferService( result.future } + override def getHostLocalDirs( + host: String, + port: Int, + execIds: Array[String], + hostLocalDirsCompletable: CompletableFuture[util.Map[String, Array[String]]]): Unit = { +val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds) +try { + val client = clientFactory.createClient(host, port) + client.sendRpc(getLocalDirsMessage.toByteBuffer, new RpcResponseCallback() { +override def onSuccess(response: ByteBuffer): Unit = { + try { +val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response) +hostLocalDirsCompletable.complete( + msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec) + } catch { +case t: Throwable => + logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", +t.getCause) + hostLocalDirsCompletable.completeExceptionally(t) + } finally { +client.close() + } +} + +override def onFailure(t: Throwable): Unit = { + logWarning(s"Error trying to get the host local dirs for executor ${execIds.head}", +t.getCause) + hostLocalDirsCompletable.completeExceptionally(t) + client.close() +} + }) +} catch { + case e: IOException => Review comment: This is migrated from `ExternalBlockStoreClient.getHostLocalDirs`. Maybe, @attilapiros has more context? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r452704155 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator( logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) { -case Success(dirs) => - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => -blockInfos.takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( -blockId, -mapIndex, -dirs.get(hostLocalBmId.executorId), -hostLocalBmId) + + val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { +val host = blockManager.blockManagerId.host +val port = blockManager.externalShuffleServicePort +Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) + } else { +hostLocalBlocksByExecutor.keysIterator + .filter(exec => execIdsWithoutDirs.contains(exec.executorId)) Review comment: corrected to `bmId`, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r452703876 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator( logDebug(s"Asynchronous fetching host-local blocks without cached executors' dir: " + s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}") val execIdsWithoutDirs = immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray - hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) { -case Success(dirs) => - immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, blockInfos) => -blockInfos.takeWhile { case (blockId, _, mapIndex) => - fetchHostLocalBlock( -blockId, -mapIndex, -dirs.get(hostLocalBmId.executorId), -hostLocalBmId) + + val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) { +val host = blockManager.blockManagerId.host +val port = blockManager.externalShuffleServicePort +Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray)) + } else { +hostLocalBlocksByExecutor.keysIterator Review comment: yea, correct! Updated to resue the `immutableHostLocalBlocksWithoutDirs`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r448971785 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1391,10 +1391,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + +s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + +s" is disabled), shuffle blocks requested from those block managers which are running on " + +s"the same host are read from the disk directly instead of being fetched as remote blocks" + +s" over the network.") Review comment: I'm not familiar with cloud env. So I ask the question to verify my understanding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r448971785 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1391,10 +1391,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + +s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + +s" is disabled), shuffle blocks requested from those block managers which are running on " + +s"the same host are read from the disk directly instead of being fetched as remote blocks" + +s" over the network.") Review comment: I'm not familiar with cloud env. So I just ask for your confirmation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r447554793 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1391,10 +1391,11 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + -s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + -"blocks requested from those block managers which are running on the same host are read " + -"from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and 1) external " + +s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) ${DYN_ALLOCATION_ENABLED.key}" + +s" is disabled), shuffle blocks requested from those block managers which are running on " + +s"the same host are read from the disk directly instead of being fetched as remote blocks" + +s" over the network.") Review comment: Do you mean, in k8s, this feature should only work when executors are using non-isolated container storage? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled
Ngone51 commented on a change in pull request #28911: URL: https://github.com/apache/spark/pull/28911#discussion_r447553884 ## File path: core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.scalatest.Matchers + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.netty.NettyBlockTransferService + +/** + * This test suite is used to test host local shuffle reading with external shuffle service disabled + */ +class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with LocalSparkContext { + test("read host local shuffle from disk with external shuffle service disabled") { +val conf = new SparkConf() + .set(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + .set(SHUFFLE_SERVICE_ENABLED, false) + .set(DYN_ALLOCATION_ENABLED, false) +sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) +sc.getConf.get(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true) +sc.env.blockManager.externalShuffleServiceEnabled should equal(false) +sc.env.blockManager.hostLocalDirManager.isDefined should equal(true) +sc.env.blockManager.blockStoreClient.getClass should equal(classOf[NettyBlockTransferService]) +TestUtils.waitUntilExecutorsUp(sc, 2, 6) + +val rdd = sc.parallelize(0 until 1000, 10) + .map { i => (i, 1) } + .reduceByKey(_ + _) + +rdd.count() +rdd.count() + +val cachedExecutors = rdd.mapPartitions { _ => + SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager => +localDirManager.getCachedHostLocalDirs().keySet.iterator + }.getOrElse(Iterator.empty) +}.collect().toSet + +// both executors are caching the dirs of the other one +cachedExecutors should equal(sc.getExecutorIds().toSet) + +// Now Spark will not receive FetchFailed as host local blocks are read from the cached local +// disk directly Review comment: I see, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org