[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480847421



##
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
##
@@ -113,6 +113,21 @@ class NettyBlockRpcServer(
 s"when there is not sufficient space available to store the 
block.")
   responseContext.onFailure(exception)
 }
+
+  case getLocalDirs: GetLocalDirsForExecutors =>
+val isIncorrectAppId = getLocalDirs.appId != appId
+val execNum = getLocalDirs.execIds.length
+if (isIncorrectAppId || execNum != 1) {
+  val errorMsg = "Invalid GetLocalDirsForExecutors request: " +
+s"${if (isIncorrectAppId) s"incorrect application id: 
${getLocalDirs.appId};"}" +
+s"${if (execNum != 1) s"incorrect executor number: $execNum 
(expected 1);"}"
+  responseContext.onFailure(new IllegalStateException(errorMsg))
+} else {
+  val execId = getLocalDirs.execIds.head

Review comment:
   Make sense to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480847229



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -212,7 +211,7 @@ private[spark] class BlockManager(
   private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
   private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
 
-  private val externalShuffleServicePort = 
StorageUtils.externalShuffleServicePort(conf)
+  private[spark] val externalShuffleServicePort = 
StorageUtils.externalShuffleServicePort(conf)

Review comment:
   It's required by `val port = blockManager.externalShuffleServicePort` 
within `ShuffleBlockFetcherIterator`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480178710



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.netty.NettyBlockTransferService
+
+/**
+ * This test suite is used to test host local shuffle reading with external 
shuffle service disabled
+ */
+class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {

Review comment:
   Thanks. I already added the checking for metrics. Let me try to add 
other assertions to catch the expected behavior.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480177199



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1415,10 +1415,11 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled, shuffle " +
+"blocks requested from those block managers which are running on the 
same host are " +
+"read from the disk directly instead of being fetched as remote blocks 
over the " +
+"network. Note that for k8s workloads, this only works when nodes are 
using " +
+"non-isolated container storage.")

Review comment:
   Thank you four your clarification. If that's the truth, I think we 
should point it out at doc. cc @dongjoon-hyun @holdenk 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480173840



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.netty.NettyBlockTransferService
+
+/**
+ * This test suite is used to test host local shuffle reading with external 
shuffle service disabled
+ */
+class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {

Review comment:
   hmm...I'm not sure whether @dongjoon-hyun was mentioning those two 
asserts. But I can try to make them as separate unit tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480157384



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1415,10 +1415,11 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled, shuffle " +
+"blocks requested from those block managers which are running on the 
same host are " +
+"read from the disk directly instead of being fetched as remote blocks 
over the " +
+"network. Note that for k8s workloads, this only works when nodes are 
using " +
+"non-isolated container storage.")

Review comment:
   > Currently this is done by using the some host in the blockmanager ID 
which works only for YARN and standalone mode, is not it?
   
   IIUC, from @holdenk 's previous comment and @dongjoon-hyun 's comment, it 
should also work for Mesos/K8s when they're using the non-isolated container.
   
   
   > A question for the future: do you have a plan to introduce block manager 
grouping based on shared storage?
   
   I don't. To be honest, I'm not familiar with the containerized resource 
manager. I'm also not sure what the plan you're meaning here. Is it only needed 
for the containerized resource manager?

##
File path: 
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.netty.NettyBlockTransferService
+
+/**
+ * This test suite is used to test host local shuffle reading with external 
shuffle service disabled
+ */
+class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {

Review comment:
   Sorry, which two asserts are you referring to? Are these two asserts in 
the new test:
   
   ```scala
   // Spark should read the shuffle data locally from the cached directories on 
the same host,
   // so there's no remote fetching at all.
   assert(localBytesRead.sum > 0)
   assert(remoteBytesRead.sum === 0)
   ```
   If they are, I actually think that checking 
`localBytesRead`/`remoteBytesRead` is equal to 
`localBlocksFetched`/`remoteBlocksFetched` here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480095723



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##
@@ -61,4 +78,56 @@ public MetricSet shuffleMetrics() {
 // Return an empty MetricSet by default.
 return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories for executors which are located at the 
same host with
+   * the current BlockStoreClient(it can be ExternalBlockStoreClient or 
NettyBlockTransferService).
+   *
+   * @param host the host of BlockManager or ExternalShuffleService. It's the 
same with current
+   * BlockStoreClient.

Review comment:
   It's a pre-condition which should be guaranteed by the caller. I've 
changed it to `should be`. Thanks! 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480095860



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##
@@ -18,15 +18,32 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import com.codahale.metrics.MetricSet;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
+import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

Review comment:
   Thank you for catching this!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480095206



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##
@@ -61,4 +78,56 @@ public MetricSet shuffleMetrics() {
 // Return an empty MetricSet by default.
 return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories for executors which are located at the 
same host with
+   * the current BlockStoreClient(it can be ExternalBlockStoreClient or 
NettyBlockTransferService).
+   *
+   * @param host the host of BlockManager or ExternalShuffleService. It's the 
same with current
+   * BlockStoreClient.
+   * @param port the port of BlockManager or ExternalShuffleService.
+   * @param execIds a collection of executor Ids, which specifies the target 
executors that we
+   *want to get their local directories. There could be 
multiple executor Ids if
+   *BlockStoreClient is implemented by 
ExternalBlockStoreClient since the request
+   *handler, ExternalShuffleService, can serve multiple 
executors on the same node.
+   *Or, only one executor Id if BlockStoreClient is 
implemented by
+   *NettyBlockTransferService.
+   * @param hostLocalDirsCompletable a CompletableFuture which contains a map 
from executor Id
+   * to its local directories if the request 
handler replies
+   * successfully. Otherwise, it contains a 
specific error.
+   */
+  public void getHostLocalDirs(
+  String host,
+  int port,
+  String[] execIds,
+  CompletableFuture> hostLocalDirsCompletable) {
+assert appId != null : "Called before init()";

Review comment:
   Sounds reasonable!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480094857



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -120,34 +120,33 @@ private[spark] class ByteBufferBlockData(
 private[spark] class HostLocalDirManager(
 futureExecutionContext: ExecutionContext,
 cacheSize: Int,
-externalBlockStoreClient: ExternalBlockStoreClient,
-host: String,
-externalShuffleServicePort: Int) extends Logging {
+blockStoreClient: BlockStoreClient) extends Logging {
 
   private val executorIdToLocalDirsCache =
 CacheBuilder
   .newBuilder()
   .maximumSize(cacheSize)
   .build[String, Array[String]]()
 
-  private[spark] def getCachedHostLocalDirs()
-  : scala.collection.Map[String, Array[String]] = 
executorIdToLocalDirsCache.synchronized {
-import scala.collection.JavaConverters._
-return executorIdToLocalDirsCache.asMap().asScala
-  }
+  private[spark] def getCachedHostLocalDirs: Map[String, Array[String]] =
+   executorIdToLocalDirsCache.synchronized {
+  executorIdToLocalDirsCache.asMap().asScala.toMap
+   }
 
   private[spark] def getHostLocalDirs(
+  host: String,
+  port: Int,
   executorIds: Array[String])(
-  callback: Try[java.util.Map[String, Array[String]]] => Unit): Unit = {
+  callback: Try[Map[String, Array[String]]] => Unit): Unit = {

Review comment:
   It's required by `fetchMultipleHostLocalBlocks`.  Actually, we could 
also do the Jave to Scala map conversion before calling  
`fetchMultipleHostLocalBlocks` but leaving `Try[java.util.Map[String, 
Array[String]]] => Unit` unchanged.
   
   But I decided to do the conversion here just because this class already 
imported `scala.collection.JavaConverters._`. It has no big difference to do 
the conversion here or there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480090760



##
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
##
@@ -113,6 +113,15 @@ class NettyBlockRpcServer(
 s"when there is not sufficient space available to store the 
block.")
   responseContext.onFailure(exception)
 }
+
+  case getLocalDirs: GetLocalDirsForExecutors =>
+assert(getLocalDirs.appId == appId)
+assert(getLocalDirs.execIds.length == 1)

Review comment:
   This's a good point. I've changed it to reply to the sender with the 
error if the request fails assertion. Thus, the sender could handle the error 
as fetch failure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480089706



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.netty.NettyBlockTransferService
+
+/**
+ * This test suite is used to test host local shuffle reading with external 
shuffle service disabled
+ */
+class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {

Review comment:
   I've combined the two host-local shuffle reading tests(with external 
shuffle service enabled or disabled) into the 
[HostLocalShuffleReadingSuite](https://github.com/apache/spark/blob/6b97be552b1d5a78f476c4a97d794c567222d9bf/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala).
 Thus, the host-local shuffle reading feature can be tested centralized. Does 
it look okay to you? @dongjoon-hyun @attilapiros 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-31 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r480086501



##
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##
@@ -66,16 +66,24 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 transfer
   }
 
+  private def createMockBlockManager(): BlockManager = {
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+// By default, the mock BlockManager returns None for hostLocalDirManager. 
One could
+// still use initHostLocalDirManager() to specify a custom 
hostLocalDirManager.

Review comment:
   tests like:
   
   `successful 3 local + 4 host local + 2 remote reads`
   `fetch continuous blocks in batch successful 3 local + 4 host local + 2 
remote reads`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-27 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r478179838



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -463,54 +466,73 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
   private[this] def fetchHostLocalBlocks(hostLocalDirManager: 
HostLocalDirManager): Unit = {
-val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
-val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
-  hostLocalBlocksByExecutor
-.map { case (hostLocalBmId, bmInfos) =>
-  (hostLocalBmId, bmInfos, 
cachedDirsByExec.get(hostLocalBmId.executorId))
-}.partition(_._3.isDefined)
-val bmId = blockManager.blockManagerId
-val immutableHostLocalBlocksWithoutDirs =
-  hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
-hostLocalBmId -> bmInfos
-  }.toMap
-if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
+val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs
+val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) = {
+  val (hasCache, noCache) = hostLocalBlocksByExecutor.partition { case 
(hostLocalBmId, _) =>
+cachedDirsByExec.contains(hostLocalBmId.executorId)
+  }
+  (hasCache.toMap, noCache.toMap)
+}
+
+if (hostLocalBlocksWithMissingDirs.nonEmpty) {
   logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
-s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
-  val execIdsWithoutDirs = 
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-  hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-case Success(dirs) =>
-  immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
-blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-  fetchHostLocalBlock(
-blockId,
-mapIndex,
-dirs.get(hostLocalBmId.executorId),
-hostLocalBmId)
-}
-  }
-  logDebug(s"Got host-local blocks (without cached executors' dir) in 
" +
-s"${Utils.getUsedTimeNs(startTimeNs)}")
-
-case Failure(throwable) =>
-  logError(s"Error occurred while fetching host local blocks", 
throwable)
-  val (hostLocalBmId, blockInfoSeq) = 
immutableHostLocalBlocksWithoutDirs.head
-  val (blockId, _, mapIndex) = blockInfoSeq.head
-  results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, 
throwable))
+s"${hostLocalBlocksWithMissingDirs.mkString(", ")}")
+
+  // If the external shuffle service is enabled, we'll fetch the local 
directories for
+  // multiple executors from the external shuffle service, which located 
at the same host
+  // with the executors, in once. Otherwise, we'll fetch the local 
directories from those
+  // executors directly one by one. The fetch requests won't be too much 
since one host is
+  // almost impossible to have many executors at the same time practically.
+  val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+val host = blockManager.blockManagerId.host
+val port = blockManager.externalShuffleServicePort
+Seq((host, port, hostLocalBlocksWithMissingDirs.keys.toArray))
+  } else {
+hostLocalBlocksWithMissingDirs.keys.map(bmId => (bmId.host, bmId.port, 
Array(bmId))).toSeq
+  }
+
+  dirFetchRequests.foreach { case (host, port, bmIds) =>
+hostLocalDirManager.getHostLocalDirs(host, port, 
bmIds.map(_.executorId)) {
+  case Success(dirsByExecId) =>
+fetchMultipleHostLocalBlocks(
+  hostLocalBlocksWithMissingDirs.filterKeys(bmIds.contains),
+  dirsByExecId,
+  cached = false)
+
+  case Failure(throwable) =>
+logError("Error occurred while fetching host local blocks", 
throwable)
+val bmId = bmIds.head
+val blockInfoSeq = hostLocalBlocksWithMissingDirs(bmId)
+val (blockId, _, mapIndex) = blockInfoSeq.head
+results.put(FailureFetchResult(blockId, mapIndex, bmId, throwable))
+}
   }
 }
+
 if (hostLocalBlocksWithCachedDirs.nonEmpty) {
   logDebug(s"Synchronous fetching host-local blocks with cached executors' 
dir: " +
   s"${hostLocalBlocksWithCachedDirs.mkString(", ")}")
-  hostLocalBlocksWithCachedDirs.foreach { case (_, blockInfos, localDirs) 
=>
-blockInfos.foreach { case (blockId, _, mapIndex) =>
-  if (!fetchHostLocalBlock(blockId, mapIndex, localDirs.get, bmId)) {

Review comment:
   @attilapiros  This looks like a bug before. The `bmId` is for the 
current executor but blocks can be other executors on the 

[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-08-27 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r478175273



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -466,42 +464,51 @@ final class ShuffleBlockFetcherIterator(
 val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
 val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
   hostLocalBlocksByExecutor
-.map { case (hostLocalBmId, bmInfos) =>
-  (hostLocalBmId, bmInfos, 
cachedDirsByExec.get(hostLocalBmId.executorId))
+.map { case (hostLocalBmId, blockInfos) =>
+  (hostLocalBmId, blockInfos, 
cachedDirsByExec.get(hostLocalBmId.executorId))
 }.partition(_._3.isDefined)
-val bmId = blockManager.blockManagerId
 val immutableHostLocalBlocksWithoutDirs =
-  hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
-hostLocalBmId -> bmInfos
+  hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, blockInfos, _) 
=>
+hostLocalBmId -> blockInfos
   }.toMap
 if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
   logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
 s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
-  val execIdsWithoutDirs = 
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-  hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-case Success(dirs) =>
-  immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
-blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-  fetchHostLocalBlock(
-blockId,
-mapIndex,
-dirs.get(hostLocalBmId.executorId),
-hostLocalBmId)
+  val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+val host = blockManager.blockManagerId.host
+val port = blockManager.externalShuffleServicePort
+Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+  } else {
+immutableHostLocalBlocksWithoutDirs.keys
+  .map(bmId => (bmId.host, bmId.port, Array(bmId))).toSeq
+  }
+
+  dirFetchRequests.foreach { case (host, port, bmIds) =>
+hostLocalDirManager.getHostLocalDirs(host, port, 
bmIds.map(_.executorId)) {
+  case Success(dirsByExecId) =>

Review comment:
   yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-24 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r459902032



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##
@@ -61,4 +78,62 @@ public MetricSet shuffleMetrics() {
 // Return an empty MetricSet by default.
 return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories for executors which are located at the 
same host with
+   * the current BlockStoreClient(it can be ExternalBlockStoreClient or 
NettyBlockTransferService).
+   *
+   *
+   * @param host the host of BlockManager or ExternalShuffleService. It's the 
same with current
+   * BlockStoreClient.
+   * @param port the port of BlockManager or ExternalShuffleService.
+   * @param execIds a collection of executor Ids, which specifies the target 
executors that we
+   *want to get their local directories. There could be 
multiple executor Ids if
+   *BlockStoreClient is implemented by 
ExternalBlockStoreClient since the request
+   *handler, ExternalShuffleService, can serve multiple 
executors on the same node.
+   *Or, only one executor Id if BlockStoreClient is 
implemented by
+   *NettyBlockTransferService.
+   * @param hostLocalDirsCompletable a CompletableFuture which contains a map 
from executor Id to its
+   * local directories if the request handler 
replies successfully.
+   * Otherwise, it contains a specific error.
+   */
+  public void getHostLocalDirs(
+  String host,
+  int port,
+  String[] execIds,
+  CompletableFuture> hostLocalDirsCompletable) {
+assert appId != null : "Called before init()";
+GetLocalDirsForExecutors getLocalDirsMessage = new 
GetLocalDirsForExecutors(appId, execIds);
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.sendRpc(getLocalDirsMessage.toByteBuffer(), new 
RpcResponseCallback() {
+@Override
+public void onSuccess(ByteBuffer response) {
+  try {
+BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
+hostLocalDirsCompletable.complete(
+((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+  } catch (Throwable t) {
+logger.warn("Error trying to get the host local dirs for " +
+Arrays.toString(getLocalDirsMessage.execIds) + " 
via external shuffle service",

Review comment:
   Thanks for catching this. I just removed "via external shuffle service" 
since the `logger` would print the current class name. Do you think it's ok?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-24 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r459897266



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -492,20 +494,26 @@ private[spark] class BlockManager(
   registerWithExternalShuffleServer()
 }
 
-hostLocalDirManager =
-  if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
-  !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
-externalBlockStoreClient.map { blockStoreClient =>
-  new HostLocalDirManager(
-futureExecutionContext,
-conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE),
-blockStoreClient,
-blockManagerId.host,
-externalShuffleServicePort)
-}
+hostLocalDirManager = {
+  val canUseHostLocalReading = 
conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
+!conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
+  val externalShuffleServiceEnabled = externalBlockStoreClient.isDefined
+  val dynamicAllocationDisabled = !conf.get(config.DYN_ALLOCATION_ENABLED)
+  val dynamicAllocationEnabledWithShuffleTacking = 
conf.get(config.DYN_ALLOCATION_ENABLED) &&
+conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)

Review comment:
   Thanks for catching it!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-24 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r459897035



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1415,10 +1415,16 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc("When enabled, shuffle blocks requested from those block managers 
which are running " +
+"on the same host are read from the disk directly instead of being 
fetched as remote " +
+"blocks over the network. Note that for k8s workloads, this only works 
when nodes are " +
+"using non-isolated container storage." +
+s"To enable the feature, one should disable 
${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key} first." +
+" And make sure that one of the following requirements are 
satisfied:\n" +
+s"1. external shuffle service is enabled 
(${SHUFFLE_SERVICE_ENABLED.key});" +
+s"2. dynamic allocation is disabled (${DYN_ALLOCATION_ENABLED.key});" +
+s"3. dynamic allocation is enabled with shuffle tracking " +
+s"(${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED});")

Review comment:
   Oh yeah... we'd cover all the cases after this PR. I'll remove the 
requirements.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-23 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r459264839



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1415,10 +1415,16 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc("When enabled, shuffle blocks requested from those block managers 
which are running " +
+"on the same host are read from the disk directly instead of being 
fetched as remote " +
+"blocks over the network. Note that for k8s workloads, this only works 
when nodes are " +
+"using non-isolated container storage." +
+s"To enable the feature, one should disable 
${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key} first." +
+" And make sure that one of the following requirements are 
satisfied:\n" +
+s"1. external shuffle service is enabled 
(${SHUFFLE_SERVICE_ENABLED.key});" +
+s"2. dynamic allocation is disabled (${DYN_ALLOCATION_ENABLED.key});" +
+s"3. dynamic allocation is enabled with shuffle tracking " +
+s"(${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED});")

Review comment:
   Please take a look at the updated document regarding the dynamic 
allocation. cc: @holdenk @attilapiros 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-21 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r458103470



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1391,10 +1391,12 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and 1) external " +
+s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) 
${DYN_ALLOCATION_ENABLED.key}" +
+s" is disabled), shuffle blocks requested from those block managers 
which are running on " +

Review comment:
   I mean, it *can* work with this feature enabled(please see my P.S. 
comment above). And I will update the document about dynamic allocation to:
   
   1. not allow when `spark.dynamicAllocation.enabled=true` && 
`spark.dynamicAllocation.shuffleTracking.enabled=false`
   
   2. allow when `spark.dynamicAllocation.enabled=true` && 
`spark.dynamicAllocation.shuffleTracking.enabled=true`
   
   also cc @attilapiros Does it make sense to you?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-21 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r458097873



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1391,10 +1391,12 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and 1) external " +
+s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) 
${DYN_ALLOCATION_ENABLED.key}" +
+s" is disabled), shuffle blocks requested from those block managers 
which are running on " +

Review comment:
   > As of Spark 3 we no longer require dynamic allocation to have a 
shuffle service.
   
   This exactly what I mentioned in P.S.. I can update the documentation 
according to this feature.
   
   But please also note dynamic allocation without external shuffle service is 
still an experimental feature disabled by default. And it has a main problem 
that the user needs to config when to delete shuffle files while most common 
users have no idea about this. And by default, shuffle files will not be 
removed until GC happens at the driver side. It also means executors won't come 
and go more frequently than dynamic allocation with shuffle service. Therefore, 
I think we were discussing a more general problem above when using dynamic 
allocation with shuffle service.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-21 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r457896569



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1391,10 +1391,12 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and 1) external " +
+s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) 
${DYN_ALLOCATION_ENABLED.key}" +
+s" is disabled), shuffle blocks requested from those block managers 
which are running on " +

Review comment:
   Yes, it prevents the case where executors could come and go in dynamic 
allocation. Also, I think it's still different from executor loss error. 
Because executor loss is an abnormal case which out of control of Spark while 
dynamic allocation is under control. And executor shutdown in dynamic 
allocation happens more frequently compares to executor loss. I think we should 
try our best to avoid shuffle fetch failure since its penalty is not trivial, 
especially when we can avoid it.
   
   Besides, for the case of dynamic allocation enabled, users could already use 
external shuffle service. Therefore, I can't think of a strong reason to mix 
these two branches.
   
   P.S. we could probably allow dynamic allocation here if 
`spark.dynamicAllocation.shuffleTracking.enabled` is also enabled.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-21 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r457876717



##
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##
@@ -194,6 +196,45 @@ private[spark] class NettyBlockTransferService(
 result.future
   }
 
+  override def getHostLocalDirs(
+  host: String,
+  port: Int,
+  execIds: Array[String],
+  hostLocalDirsCompletable: CompletableFuture[util.Map[String, 
Array[String]]]): Unit = {
+val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds)
+try {
+  val client = clientFactory.createClient(host, port)
+  client.sendRpc(getLocalDirsMessage.toByteBuffer, new 
RpcResponseCallback() {
+override def onSuccess(response: ByteBuffer): Unit = {
+  try {
+val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response)
+hostLocalDirsCompletable.complete(
+  msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec)
+  } catch {
+case t: Throwable =>
+  logWarning(s"Error trying to get the host local dirs for 
executor ${execIds.head}",
+t.getCause)
+  hostLocalDirsCompletable.completeExceptionally(t)
+  } finally {
+client.close()
+  }
+}
+
+override def onFailure(t: Throwable): Unit = {
+  logWarning(s"Error trying to get the host local dirs for executor 
${execIds.head}",
+t.getCause)
+  hostLocalDirsCompletable.completeExceptionally(t)
+  client.close()
+}
+  })
+} catch {
+  case e: IOException =>

Review comment:
   Yes. Good idea. I've did the refactor. Please take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-21 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r457876162



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##
@@ -61,4 +63,17 @@ public MetricSet shuffleMetrics() {
 // Return an empty MetricSet by default.
 return () -> Collections.emptyMap();
   }
+
+  /**
+   * Request the local disk directories, which are specified by 
DiskBlockManager, for the executors
+   * from the external shuffle service (when this is a 
ExternalBlockStoreClient) or BlockManager
+   * (when this is a NettyBlockTransferService). Note there's only one 
executor when this is a
+   * NettyBlockTransferService because we ask one specific executor at a time.

Review comment:
   I added the check to ensure it's the only one executor id but didn't 
check its equality with blockManager's executor id. Because we only have 
`BlockDataManager` in `NettyBlockRpcServer` which does not expose executor id. 
   
   I am still wondering whether it's worthwhile to expose it for the sanity 
check purpose.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-10 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r452705081



##
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##
@@ -194,6 +196,45 @@ private[spark] class NettyBlockTransferService(
 result.future
   }
 
+  override def getHostLocalDirs(
+  host: String,
+  port: Int,
+  execIds: Array[String],
+  hostLocalDirsCompletable: CompletableFuture[util.Map[String, 
Array[String]]]): Unit = {
+val getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds)
+try {
+  val client = clientFactory.createClient(host, port)
+  client.sendRpc(getLocalDirsMessage.toByteBuffer, new 
RpcResponseCallback() {
+override def onSuccess(response: ByteBuffer): Unit = {
+  try {
+val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response)
+hostLocalDirsCompletable.complete(
+  msgObj.asInstanceOf[LocalDirsForExecutors].getLocalDirsByExec)
+  } catch {
+case t: Throwable =>
+  logWarning(s"Error trying to get the host local dirs for 
executor ${execIds.head}",
+t.getCause)
+  hostLocalDirsCompletable.completeExceptionally(t)
+  } finally {
+client.close()
+  }
+}
+
+override def onFailure(t: Throwable): Unit = {
+  logWarning(s"Error trying to get the host local dirs for executor 
${execIds.head}",
+t.getCause)
+  hostLocalDirsCompletable.completeExceptionally(t)
+  client.close()
+}
+  })
+} catch {
+  case e: IOException =>

Review comment:
   This is migrated from `ExternalBlockStoreClient.getHostLocalDirs`. 
Maybe, @attilapiros has more context?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-10 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r452704155



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator(
   logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
 s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
   val execIdsWithoutDirs = 
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-  hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-case Success(dirs) =>
-  immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
-blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-  fetchHostLocalBlock(
-blockId,
-mapIndex,
-dirs.get(hostLocalBmId.executorId),
-hostLocalBmId)
+
+  val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+val host = blockManager.blockManagerId.host
+val port = blockManager.externalShuffleServicePort
+Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+  } else {
+hostLocalBlocksByExecutor.keysIterator
+  .filter(exec => execIdsWithoutDirs.contains(exec.executorId))

Review comment:
   corrected to `bmId`, thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-10 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r452703876



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -478,30 +475,43 @@ final class ShuffleBlockFetcherIterator(
   logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
 s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
   val execIdsWithoutDirs = 
immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray
-  hostLocalDirManager.getHostLocalDirs(execIdsWithoutDirs) {
-case Success(dirs) =>
-  immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
-blockInfos.takeWhile { case (blockId, _, mapIndex) =>
-  fetchHostLocalBlock(
-blockId,
-mapIndex,
-dirs.get(hostLocalBmId.executorId),
-hostLocalBmId)
+
+  val dirFetchRequests = if (blockManager.externalShuffleServiceEnabled) {
+val host = blockManager.blockManagerId.host
+val port = blockManager.externalShuffleServicePort
+Seq((host, port, immutableHostLocalBlocksWithoutDirs.keys.toArray))
+  } else {
+hostLocalBlocksByExecutor.keysIterator

Review comment:
   yea, correct! Updated to resue the `immutableHostLocalBlocksWithoutDirs`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-02 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r448971785



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1391,10 +1391,11 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and 1) external " +
+s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) 
${DYN_ALLOCATION_ENABLED.key}" +
+s" is disabled), shuffle blocks requested from those block managers 
which are running on " +
+s"the same host are read from the disk directly instead of being 
fetched as remote blocks" +
+s" over the network.")

Review comment:
   I'm not familiar with cloud env. So I ask the question to verify my 
understanding.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-07-02 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r448971785



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1391,10 +1391,11 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and 1) external " +
+s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) 
${DYN_ALLOCATION_ENABLED.key}" +
+s" is disabled), shuffle blocks requested from those block managers 
which are running on " +
+s"the same host are read from the disk directly instead of being 
fetched as remote blocks" +
+s" over the network.")

Review comment:
   I'm not familiar with cloud env. So I just ask for your confirmation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-06-30 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r447554793



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1391,10 +1391,11 @@ package object config {
 
   private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
 ConfigBuilder("spark.shuffle.readHostLocalDisk")
-  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and external " +
-s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
-"blocks requested from those block managers which are running on the 
same host are read " +
-"from the disk directly instead of being fetched as remote blocks over 
the network.")
+  .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is 
disabled and 1) external " +
+s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled or 2) 
${DYN_ALLOCATION_ENABLED.key}" +
+s" is disabled), shuffle blocks requested from those block managers 
which are running on " +
+s"the same host are read from the disk directly instead of being 
fetched as remote blocks" +
+s" over the network.")

Review comment:
   Do you mean, in k8s, this feature should only work when executors are 
using non-isolated container storage?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28911: [SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled

2020-06-30 Thread GitBox


Ngone51 commented on a change in pull request #28911:
URL: https://github.com/apache/spark/pull/28911#discussion_r447553884



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.scalatest.Matchers
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.netty.NettyBlockTransferService
+
+/**
+ * This test suite is used to test host local shuffle reading with external 
shuffle service disabled
+ */
+class HostLocalShuffleFetchSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {
+  test("read host local shuffle from disk with external shuffle service 
disabled") {
+val conf = new SparkConf()
+  .set(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+  .set(SHUFFLE_SERVICE_ENABLED, false)
+  .set(DYN_ALLOCATION_ENABLED, false)
+sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+sc.getConf.get(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true)
+sc.env.blockManager.externalShuffleServiceEnabled should equal(false)
+sc.env.blockManager.hostLocalDirManager.isDefined should equal(true)
+sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[NettyBlockTransferService])
+TestUtils.waitUntilExecutorsUp(sc, 2, 6)
+
+val rdd = sc.parallelize(0 until 1000, 10)
+  .map { i => (i, 1) }
+  .reduceByKey(_ + _)
+
+rdd.count()
+rdd.count()
+
+val cachedExecutors = rdd.mapPartitions { _ =>
+  SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager =>
+localDirManager.getCachedHostLocalDirs().keySet.iterator
+  }.getOrElse(Iterator.empty)
+}.collect().toSet
+
+// both executors are caching the dirs of the other one
+cachedExecutors should equal(sc.getExecutorIds().toSet)
+
+// Now Spark will not receive FetchFailed as host local blocks are read 
from the cached local
+// disk directly

Review comment:
   I see, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org