zjf2012 commented on a change in pull request #23560: [SPARK-26632][Spark Core] Separate Thread Configurations of Driver and Executor URL: https://github.com/apache/spark/pull/23560#discussion_r249670978
########## File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala ########## @@ -193,13 +193,24 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte endpoints.containsKey(name) } + def getNumOfThreads(conf: SparkConf, defaultNumThreads: Int): Int = { + val executorId = conf.get("spark.executor.id", "") + val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER || + executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER + val side = if (isDriver) "driver" else "executor" + val num = conf.getInt(s"spark.$side.rpc.netty.dispatcher.numThreads", defaultNumThreads) + if (num > 0) num else defaultNumThreads + } + /** Thread pool used for dispatching messages. */ private val threadpool: ThreadPoolExecutor = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, availableCores)) - val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") + val finalNumThreads = getNumOfThreads(nettyEnv.conf, numThreads) Review comment: thanks chenzhao. I think we still need the fallback because the value set by user might be invalid, like negative value. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org