[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-06 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281433202
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/network/netty/SparkTransportConfSuite.scala
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.NettyUtils
+
+class SparkTransportConfSuite extends SparkFunSuite with MockitoSugar {
+  val module = "rpc"
+  val serThreads = "serverThreads"
+  val cliThreads = "clientThreads"
+
+  test("default value is get when neither role nor module is set") {
+val numUsableCores = 4
+val conf = new SparkConf()
+val sparkTransportConf = SparkTransportConf.fromSparkConf(conf, module, 
numUsableCores, None)
+val expected = NettyUtils.defaultNumThreads(numUsableCores)
+val serActual = sparkTransportConf.get(s"spark.$module.io.$serThreads", "")
+val cliActual = sparkTransportConf.get(s"spark.$module.io.$cliThreads", "")
+assert(serActual == expected.toString)
+assert(cliActual == expected.toString)
+  }
+
+  test("module value is get when role is not set") {
+val numUsableCores = 3
+val serExpected = "7"
+val cliExpected = "5"
+val conf = new SparkConf()
+  .set(s"spark.$module.io.$serThreads", serExpected)
+  .set(s"spark.$module.io.$cliThreads", cliExpected)
+val sparkTransportConf = SparkTransportConf.fromSparkConf(conf, module, 
numUsableCores, None)
+val serActual = sparkTransportConf.get(s"spark.$module.io.$serThreads", "")
+val cliActual = sparkTransportConf.get(s"spark.$module.io.$cliThreads", "")
+assert(serActual == serExpected)
+assert(cliActual == cliExpected)
+  }
+
+  test("role value is get when role is set. " +
 
 Review comment:
   @vanzin , updated. please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-06 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281433180
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
 ##
 @@ -47,11 +48,15 @@ private[netty] class NettyRpcEnv(
 host: String,
 securityManager: SecurityManager,
 numUsableCores: Int) extends RpcEnv(conf) with Logging {
+  val role = conf.get(EXECUTOR_ID).flatMap { id =>
 
 Review comment:
   @vanzin , updated. please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-06 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281433257
 
 

 ##
 File path: docs/configuration.md
 ##
 @@ -1954,6 +1954,46 @@ Apart from these, the following properties are also 
available, and may be useful
 
 
 
+### Thread Configurations
+
+Depending on jobs and cluster configurations, we can set number of threads in 
several places in Spark to utilize 
+available resources efficiently to get better performance. Prior to Spark 3.0, 
these thread configurations apply 
+to all roles of Spark, such as driver, executor, worker and master. From Spark 
3.0, we can configure threads in 
+finer granularity starting from driver and executor. Take RPC module as 
example in below table. For other modules,
+like shuffle, just replace "rpc" with "shuffle" in the property names except 
+spark.{driver|executor}.rpc.netty.dispatcher.numThreads, which is 
only for RPC module.
+
+
+Property NameDefaultMeaning
+
+  spark.{driver|executor}.rpc.io.serverThreads
+  
+Fall back on spark.rpc.io.serverThreads
+  
+  Number of threads used in the server thread pool
+
+
+  spark.{driver|executor}.rpc.io.clientThreads
+  
+Fall back on spark.rpc.io.clientThreads
+  
+  Number of threads used in the client thread pool
+
+
+  spark.{driver|executor}.rpc.netty.dispatcher.numThreads
+  
+Fall back on spark.rpc.netty.dispatcher.numThreads
+  
+  Number of threads used in RPC message dispatcher thread pool
+
+
+
+The default values of spark.rpc.io.serverThreads, spark.rpc.io.clientThreads 
and spark.rpc.netty.dispatcher.numThreads
+are same. It's 
+number of CPU cores if specified. Otherwise, the available processors to the 
JVM. In either cases, the default value 
 
 Review comment:
   @vanzin , updated. please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-06 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281433235
 
 

 ##
 File path: docs/configuration.md
 ##
 @@ -1954,6 +1954,46 @@ Apart from these, the following properties are also 
available, and may be useful
 
 
 
+### Thread Configurations
+
+Depending on jobs and cluster configurations, we can set number of threads in 
several places in Spark to utilize 
+available resources efficiently to get better performance. Prior to Spark 3.0, 
these thread configurations apply 
+to all roles of Spark, such as driver, executor, worker and master. From Spark 
3.0, we can configure threads in 
+finer granularity starting from driver and executor. Take RPC module as 
example in below table. For other modules,
+like shuffle, just replace "rpc" with "shuffle" in the property names except 
+spark.{driver|executor}.rpc.netty.dispatcher.numThreads, which is 
only for RPC module.
+
+
+Property NameDefaultMeaning
+
+  spark.{driver|executor}.rpc.io.serverThreads
+  
+Fall back on spark.rpc.io.serverThreads
+  
+  Number of threads used in the server thread pool
+
+
+  spark.{driver|executor}.rpc.io.clientThreads
+  
+Fall back on spark.rpc.io.clientThreads
+  
+  Number of threads used in the client thread pool
+
+
+  spark.{driver|executor}.rpc.netty.dispatcher.numThreads
+  
+Fall back on spark.rpc.netty.dispatcher.numThreads
+  
+  Number of threads used in RPC message dispatcher thread pool
+
+
+
+The default values of spark.rpc.io.serverThreads, spark.rpc.io.clientThreads 
and spark.rpc.netty.dispatcher.numThreads
 
 Review comment:
   @vanzin , updated. please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-06 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281431572
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
 ##
 @@ -47,11 +48,15 @@ private[netty] class NettyRpcEnv(
 host: String,
 securityManager: SecurityManager,
 numUsableCores: Int) extends RpcEnv(conf) with Logging {
+  val role = conf.get(EXECUTOR_ID).flatMap { id =>
+if (id == SparkContext.DRIVER_IDENTIFIER) Some("driver") else 
Some("executor")
+  }
 
   private[netty] val transportConf = SparkTransportConf.fromSparkConf(
 conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1),
 "rpc",
-conf.get(RPC_IO_THREADS).getOrElse(numUsableCores))
+conf.get(RPC_IO_THREADS).getOrElse(numUsableCores),
+role)
 
 Review comment:
   @jerryshao , The guide says,
   
   > For method declarations, use 4 space indentation for their parameters and 
put each in each line when the parameters don't fit in two lines.  
   
   I'll keep this code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-05 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281054320
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
 ##
 @@ -47,11 +48,15 @@ private[netty] class NettyRpcEnv(
 host: String,
 securityManager: SecurityManager,
 numUsableCores: Int) extends RpcEnv(conf) with Logging {
+  val role = conf.get(EXECUTOR_ID).map { id =>
+if (id == SparkContext.DRIVER_IDENTIFIER) Some("driver") else 
Some("executor")
+  }.getOrElse(None)
 
 Review comment:
   @vanzin , updated, please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-05 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281054293
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/network/netty/SparkTransportConfSuite.scala
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.NettyUtils
+
+class SparkTransportConfSuite extends SparkFunSuite with MockitoSugar with 
Matchers{
+  val module = "rpc"
+  val serThreads = "serverThreads"
+  val cliThreads = "clientThreads"
+
+  test("default value is get when neither role nor module is set") {
+val numUsableCores = 4
+val conf = new SparkConf()
+val sparkTransportConf = SparkTransportConf.fromSparkConf(conf, module, 
numUsableCores, None)
+val expected = NettyUtils.defaultNumThreads(numUsableCores)
+val serActual = sparkTransportConf.get(s"spark.$module.io.$serThreads", "")
+val cliActual = sparkTransportConf.get(s"spark.$module.io.$cliThreads", "")
+serActual should equal(expected.toString)
+cliActual should equal(expected.toString)
+  }
+
+  test("module value is get when role is not set") {
+val numUsableCores = 3
+val serExpected = "7"
+val cliExpected = "5"
+val conf = new SparkConf()
+  .set(s"spark.$module.io.$serThreads", serExpected)
+  .set(s"spark.$module.io.$cliThreads", cliExpected)
+val sparkTransportConf = SparkTransportConf.fromSparkConf(conf, module, 
numUsableCores, None)
+val serActual = sparkTransportConf.get(s"spark.$module.io.$serThreads", "")
+val cliActual = sparkTransportConf.get(s"spark.$module.io.$cliThreads", "")
+serActual should equal(serExpected)
+cliActual should equal(cliExpected)
+  }
+
+  test("role value is get when role is set") {
+val role = Some("driver")
+val numUsableCores = 10
+val serModule = "7"
+val cliModule = "5"
+val serExpected = "8"
+val cliExpected = "6"
+val conf = new SparkConf()
+  .set(s"spark.$module.io.$serThreads", serModule)
+  .set(s"spark.$module.io.$cliThreads", cliModule)
+  .set(s"spark.${role.get}.$module.io.$serThreads", serExpected)
+  .set(s"spark.${role.get}.$module.io.$cliThreads", cliExpected)
+val sparkTransportConf = SparkTransportConf.fromSparkConf(conf, module, 
numUsableCores, role)
+val serActual = sparkTransportConf.get(s"spark.$module.io.$serThreads", "")
+val cliActual = sparkTransportConf.get(s"spark.$module.io.$cliThreads", "")
+serActual should equal(serExpected)
+cliActual should equal(cliExpected)
+  }
+
+  test("module value is get when role other than mine is set") {
 
 Review comment:
   @vanzin , updated, please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-05 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281054309
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/network/netty/SparkTransportConfSuite.scala
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.NettyUtils
+
+class SparkTransportConfSuite extends SparkFunSuite with MockitoSugar with 
Matchers{
 
 Review comment:
   @vanzin , updated, please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-05-05 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r281054282
 
 

 ##
 File path: docs/configuration.md
 ##
 @@ -1954,6 +1954,46 @@ Apart from these, the following properties are also 
available, and may be useful
 
 
 
+### Thread Configurations
+
+Depending on jobs and cluster configurations, we can set number of threads in 
several places in Spark to utilize 
+available resources efficiently to get better performance. Prior to Spark 3.0, 
these thread configurations apply 
+to all roles of Spark, such as driver, executor, worker and master. From Spark 
3.0, we can configure threads in 
+finer granularity starting from driver and executor. Take RPC module as 
example in below table. For other modules,
+like shuffle, just replace "rpc" with "shuffle" in the property names except 
+spark.{driver|executor}.rpc.netty.dispatcher.numThreads, which is only for RPC 
module.
 
 Review comment:
   @vanzin , updated, please check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-04-29 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r279620060
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +195,29 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  private def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
 
 Review comment:
   the comment is removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-04-29 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r279620007
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +195,29 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  private def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// get right role
 
 Review comment:
   the comment is removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-04-29 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r279619960
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +195,29 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  private def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// get right role
+val executorId = conf.get(EXECUTOR_ID).getOrElse("")
 
 Review comment:
   @vanzin , thanks for the clean code. code is rewritten. please help review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-04-29 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r279619817
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
 ##
 @@ -47,11 +48,21 @@ private[netty] class NettyRpcEnv(
 host: String,
 securityManager: SecurityManager,
 numUsableCores: Int) extends RpcEnv(conf) with Logging {
+  // try to get specific threads configurations of driver and executor
+  val executorId = conf.get(EXECUTOR_ID).getOrElse("")
+  // neither driver nor executor if executor id is not set
 
 Review comment:
   the comment removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-04-29 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r279619771
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
 ##
 @@ -47,11 +48,21 @@ private[netty] class NettyRpcEnv(
 host: String,
 securityManager: SecurityManager,
 numUsableCores: Int) extends RpcEnv(conf) with Logging {
+  // try to get specific threads configurations of driver and executor
+  val executorId = conf.get(EXECUTOR_ID).getOrElse("")
+  // neither driver nor executor if executor id is not set
+  val role = executorId match {
+case "" => None
+case SparkContext.DRIVER_IDENTIFIER => Some("driver")
+// any other non-empty values since executor must has "spark.executor.id" 
set
 
 Review comment:
   the comment removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-14 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265469014
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// try to get specific threads configurations of driver and executor
+// override module configurations if specified
+val executorId = conf.get("spark.executor.id", "")
 
 Review comment:
   @vanzin , I set executorId to spark.executor.id explicitly in two executors' 
backends, CoarseGrainedExecutorBackend and MesosExecutorBackend. For yarn, it 
uses CoarseGrainedExecutorBackend. For driver, we don't need to set it since 
it's already set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-14 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265467893
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -36,16 +36,30 @@ object SparkTransportConf {
* @param numUsableCores if nonzero, this will restrict the server and 
client threads to only
*   use the given number of cores, rather than all of 
the machine's cores.
*   This restriction will only occur if these 
properties are not already set.
+   * @param role   optional role, could be driver, executor, worker 
and master. Default is
+   *  [[None]], means no role specific configurations.
*/
-  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
+  def fromSparkConf(
+  _conf: SparkConf,
+  module: String,
+  numUsableCores: Int = 0,
+  role: Option[String] = None): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+// specify default thread configuration based on our JVM's allocation of 
cores (rather than
+// necessarily assuming we have all the machine's cores).
 val numThreads = NettyUtils.defaultNumThreads(numUsableCores)
-conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
-conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
+// module threads configuration
+val (modServerThreads, modClientThreads) =
 
 Review comment:
   @vanzin , changed code by following your style. My code has some difference 
than yours because I need to override module's config with role's if any. the 
order is role > module > default. It's reflected in the unit test. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265398892
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 
 Review comment:
   let me make it private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265398764
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// try to get specific threads configurations of driver and executor
+// override module configurations if specified
+val executorId = conf.get("spark.executor.id", "")
+// neither driver nor executor if executor id is not set
+val role = executorId match {
+  case "" => ""
+  case SparkContext.DRIVER_IDENTIFIER => "driver"
+  // any other non-empty values since executor must has 
"spark.executor.id" set
 
 Review comment:
   let me remove duplicate comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265398794
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// try to get specific threads configurations of driver and executor
+// override module configurations if specified
+val executorId = conf.get("spark.executor.id", "")
+// neither driver nor executor if executor id is not set
 
 Review comment:
   let me remove duplicate comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265398720
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// try to get specific threads configurations of driver and executor
+// override module configurations if specified
+val executorId = conf.get("spark.executor.id", "")
 
 Review comment:
   for driver, we can get spark.executor.id with value of "driver", but not for 
executor. let me fix it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265392921
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// try to get specific threads configurations of driver and executor
+// override module configurations if specified
+val executorId = conf.get("spark.executor.id", "")
 
 Review comment:
   It seems you are true. let me double-check. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265388998
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,32 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
-  /** Thread pool used for dispatching messages. */
-  private val threadpool: ThreadPoolExecutor = {
+  def getNumOfThreads(conf: SparkConf): Int = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+// module configuration
+val modNumThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
   .getOrElse(math.max(2, availableCores))
+// try to get specific threads configurations of driver and executor
+// override module configurations if specified
+val executorId = conf.get("spark.executor.id", "")
+// neither driver nor executor if executor id is not set
 
 Review comment:
   you mean I should wrap the logic in one method for multiple invocation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r265388630
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -36,16 +36,30 @@ object SparkTransportConf {
* @param numUsableCores if nonzero, this will restrict the server and 
client threads to only
*   use the given number of cores, rather than all of 
the machine's cores.
*   This restriction will only occur if these 
properties are not already set.
+   * @param role   optional role, could be driver, executor, worker 
and master. Default is
+   *  [[None]], means no role specific configurations.
*/
-  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
+  def fromSparkConf(
+  _conf: SparkConf,
+  module: String,
+  numUsableCores: Int = 0,
+  role: Option[String] = None): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+// specify default thread configuration based on our JVM's allocation of 
cores (rather than
+// necessarily assuming we have all the machine's cores).
 val numThreads = NettyUtils.defaultNumThreads(numUsableCores)
-conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
-conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
+// module threads configuration
+val (modServerThreads, modClientThreads) =
 
 Review comment:
   ok, let me try


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264988473
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -31,21 +31,27 @@ object SparkTransportConf {
 
   /**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
-   * @param _conf the [[SparkConf]]
-   * @param module the module name
+   * @param _conf the [[SparkConf]]
+   * @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and 
client threads to only
*   use the given number of cores, rather than all of 
the machine's cores.
*   This restriction will only occur if these 
properties are not already set.
+   * @param role   optional role, could be driver, executor, worker 
and master. Default is
+   *  [[None]], means no role specific configurations.
*/
-  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
+  def fromSparkConf(
+  _conf: SparkConf,
+  module: String,
+  numUsableCores: Int = 0,
+  role: Option[String] = None): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+val (serverThreads, clientThreads) = getThreadsConfByRole(conf, module, 
role)
 
 Review comment:
   re-structured code as discussed. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264988596
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,38 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
+  def getNumOfThreads(conf: SparkConf): Int = {
+// try to get specific threads configurations of driver and executor
+val executorId = conf.get("spark.executor.id", "")
+// neither driver nor executor if executor id is not set
+var role = ""
 
 Review comment:
   re-structured code as discussed. thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264988623
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
 ##
 @@ -47,11 +47,24 @@ private[netty] class NettyRpcEnv(
 host: String,
 securityManager: SecurityManager,
 numUsableCores: Int) extends RpcEnv(conf) with Logging {
+  // try to get specific threads configurations of driver and executor
+  val executorId = conf.get("spark.executor.id", "")
+  // neither driver nor executor if executor id is not set
+  var role: Option[String] = None
 
 Review comment:
   re-structured code as discussed. thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-13 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264988371
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -31,21 +31,27 @@ object SparkTransportConf {
 
   /**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
-   * @param _conf the [[SparkConf]]
-   * @param module the module name
+   * @param _conf the [[SparkConf]]
+   * @param module the module name
 
 Review comment:
   corrected. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-12 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264951407
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -39,13 +39,18 @@ object SparkTransportConf {
*/
   def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+  executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val role = if (isDriver) "driver" else "executor"
 
 Review comment:
   @vanzin , I just changed code to fix the potential issue of daemon 
miss-using executor's configuration. Each executor has executor id being set to 
spark.executor.id which gives me a chance to judge whether it's executor or 
daemons. The default role is set to None and only RPCEnv tries to set role if 
there are driver or executor specific configurations. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-11 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264496248
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -39,13 +39,18 @@ object SparkTransportConf {
*/
   def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+  executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val role = if (isDriver) "driver" else "executor"
 
 Review comment:
   @vanzin , ok, let me try. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-11 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264488007
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -39,13 +39,18 @@ object SparkTransportConf {
*/
   def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+  executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val role = if (isDriver) "driver" else "executor"
 
 Review comment:
   @vanzin , I get your point. Actually, @jerryshao also pointed out the 
similar problem and told me to introduce "role" to make code clear. At the 
time, I tried to make the "role" as explicit argument to this method. However, 
it involves in a lot of code changes in many areas to get right "role".  It's 
not a simple task to be covered in this PR.  It seems that they don't want this 
PR to be that complex. Maybe we can do it when "role" is clearly defined in 
worker, master, driver etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-10 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264070140
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,26 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
+  def getNumOfThreads(conf: SparkConf): Int = {
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+  executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val side = if (isDriver) "driver" else "executor"
 
 Review comment:
   @vanzin , see my last comment for you. Besides, master and worker are 
relative more decoupled than driver and executor. We have alternatives to make 
them have different configurations. But for driver and executor, we cannot 
easily to do that. For example, for spark.rpc.netty.dispatcher.numThreads, you 
may have a optimized value, A, for driver. But it is usually not a optimized 
value for executors. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-10 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264070140
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -194,12 +194,26 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
+  def getNumOfThreads(conf: SparkConf): Int = {
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+  executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val side = if (isDriver) "driver" else "executor"
 
 Review comment:
   @vanzin , see my last comment for you. Besides, master and worker are 
relative more decoupled than driver and executor. We have alternatives to make 
them have different configurations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] Separate Thread Configurations of Driver and Executor

2019-03-10 Thread GitBox
zjf2012 commented on a change in pull request #23560: [SPARK-26632][Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r264069852
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -39,13 +39,18 @@ object SparkTransportConf {
*/
   def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 
0): TransportConf = {
 val conf = _conf.clone
-
-// Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
-// assuming we have all the machine's cores).
-// NB: Only set if serverThreads/clientThreads not already set.
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+  executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val role = if (isDriver) "driver" else "executor"
 
 Review comment:
   @vanzin , The configuration is read not only from "role", but also "module" 
in this enhancement. If there is no configurations for "role", it will fallback 
to "module". For ExternalShuffleService, we can just ignore "role" and use 
"module" of "shuffle", such as, spark.shuffle.*.  This enhancement weighs on 
the bottleneck of driver when they are too many RPC messages due to large 
number of small tasks. 
   Of course, we can have more "roles" if there are areas can be performance 
improved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org