[GitHub] gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] Separate Thread Configurations of Driver and Executor

2019-01-22 Thread GitBox
gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r249727244
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -193,13 +193,24 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
+  def getNumOfThreads(conf: SparkConf, defaultNumThreads: Int): Int = {
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val side = if (isDriver) "driver" else "executor"
+val num = conf.getInt(s"spark.$side.rpc.netty.dispatcher.numThreads", 
defaultNumThreads)
+if (num > 0) num else defaultNumThreads
+  }
+
   /** Thread pool used for dispatching messages. */
   private val threadpool: ThreadPoolExecutor = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
 val numThreads = 
nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
   math.max(2, availableCores))
-val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"dispatcher-event-loop")
+val finalNumThreads = getNumOfThreads(nettyEnv.conf, numThreads)
 
 Review comment:
   Yes, but we only need it when specific config is invalid


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] Separate Thread Configurations of Driver and Executor

2019-01-22 Thread GitBox
gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r249726856
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -55,4 +58,32 @@ object SparkTransportConf {
   }
 })
   }
+
+  /**
+   * Separate threads configuration of driver and executor
+   * @param conf the [[SparkConf]]
+   * @param module the module name
+   * @param server if true, it's for the serverThreads. Otherwise, it's for 
the clientThreads.
+   * @param defaultNumThreads default number of threads
+   * @return
+   */
+  def getNumOfThreads(
+  conf: SparkConf,
+  module: String,
+  server: Boolean,
+  defaultNumThreads: Int): String = {
 
 Review comment:
   numThreads is more natural to be a Int


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] Separate Thread Configurations of Driver and Executor

2019-01-21 Thread GitBox
gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r249665643
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
 ##
 @@ -55,4 +58,32 @@ object SparkTransportConf {
   }
 })
   }
+
+  /**
+   * Separate threads configuration of driver and executor
+   * @param conf the [[SparkConf]]
+   * @param module the module name
+   * @param server if true, it's for the serverThreads. Otherwise, it's for 
the clientThreads.
+   * @param defaultNumThreads default number of threads
+   * @return
+   */
+  def getNumOfThreads(
+  conf: SparkConf,
+  module: String,
+  server: Boolean,
+  defaultNumThreads: Int): String = {
 
 Review comment:
   :nit Can be: Int, cast it to String outside this function


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] Separate Thread Configurations of Driver and Executor

2019-01-21 Thread GitBox
gczsjdy commented on a change in pull request #23560: [SPARK-26632][Spark Core] 
Separate Thread Configurations of Driver and Executor
URL: https://github.com/apache/spark/pull/23560#discussion_r249665381
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##
 @@ -193,13 +193,24 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
 endpoints.containsKey(name)
   }
 
+  def getNumOfThreads(conf: SparkConf, defaultNumThreads: Int): Int = {
+val executorId = conf.get("spark.executor.id", "")
+val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER ||
+executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+val side = if (isDriver) "driver" else "executor"
+val num = conf.getInt(s"spark.$side.rpc.netty.dispatcher.numThreads", 
defaultNumThreads)
+if (num > 0) num else defaultNumThreads
+  }
+
   /** Thread pool used for dispatching messages. */
   private val threadpool: ThreadPoolExecutor = {
 val availableCores =
   if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
 val numThreads = 
nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
   math.max(2, availableCores))
-val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"dispatcher-event-loop")
+val finalNumThreads = getNumOfThreads(nettyEnv.conf, numThreads)
 
 Review comment:
   When driver or executor numThreads are set, we probably should not access 
the fallback option. Maybe refactor line 209-211?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org