mridulm commented on code in PR #41746:
URL: https://github.com/apache/spark/pull/41746#discussion_r1246921541


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -383,20 +383,22 @@ class SparkContext(config: SparkConf) extends Logging {
   def setLogLevel(logLevel: String): Unit = {
     // let's allow lowercase or mixed case too
     val upperCased = logLevel.toUpperCase(Locale.ROOT)
-    require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
+    require(
+      SparkContext.VALID_LOG_LEVELS.contains(upperCased),

Review Comment:
   Revert ?



##########
core/src/main/scala/org/apache/spark/util/Utils.scala:
##########
@@ -2464,16 +2464,27 @@ private[spark] object Utils extends Logging with 
SparkClassUtils {
    * configure a new log4j level
    */
   def setLogLevel(l: Level): Unit = {
-    val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext]
-    val config = ctx.getConfiguration()
-    val loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME)
+    val (ctx, loggerConfig) = getLogContext
     loggerConfig.setLevel(l)
     ctx.updateLoggers()
 
     // Setting threshold to null as rootLevel will define log level for 
spark-shell
     Logging.sparkShellThresholdLevel = null
   }
 
+  private def getLogContext = {

Review Comment:
   Specify the return types.
   
   `lazy val` instead ?



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -638,6 +649,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
+  def refershExecutors(msg: Map[String, String] ): Unit = {
+    try {
+      if (driverEndpoint != null) {
+        logInfo("Refreshing executors")
+        driverEndpoint.askSync[Boolean](RefreshExecutors(msg))

Review Comment:
   `ask` should be sufficient, we dont need to block here.



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -311,13 +311,24 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
               decommissionExecutors(Array((executorId, v._1)), v._2, v._3)
               unknownExecutorsPendingDecommission.invalidate(executorId)
             })
+          // propagate current log level to new executor only if flag is true
+          if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+            data.executorEndpoint.send(RefreshExecutor(Map("logLevel" -> 
Utils.getLogLevel)))
+          }

Review Comment:
   Instead, why not set `spark.log.level` in sc.conf when `setLogLevel` changes 
the log level ? Executors can initialize based on this when starting, and not 
require this rpc call as soon as they start.



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:
##########
@@ -49,6 +49,10 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
+  case class RefreshExecutors(msg: Map[String, String]) extends 
CoarseGrainedClusterMessage
+
+  case class RefreshExecutor(msg: Map[String, String]) extends 
CoarseGrainedClusterMessage

Review Comment:
   This is a very generic message - we can be more opinionated about what we 
are trying to do here.



##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -383,20 +383,22 @@ class SparkContext(config: SparkConf) extends Logging {
   def setLogLevel(logLevel: String): Unit = {
     // let's allow lowercase or mixed case too
     val upperCased = logLevel.toUpperCase(Locale.ROOT)
-    require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
+    require(
+      SparkContext.VALID_LOG_LEVELS.contains(upperCased),
       s"Supplied level $logLevel did not match one of:" +
         s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
-    Utils.setLogLevel(Level.toLevel(upperCased))
+    // Update only if new log level is not same as current log level
+    if (upperCased != Utils.getLogLevel) {
+      Utils.setLogLevel(Level.toLevel(upperCased))
+    // Inform all executors about the change
+      if (config.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+        _schedulerBackend.refresh(immutable.Map("logLevel" -> upperCased))
+      }
+    }
   }
 
   try {
     _conf = config.clone()
-    _conf.get(SPARK_LOG_LEVEL).foreach { level =>
-      if (Logging.setLogLevelPrinted) {
-        System.err.printf("Setting Spark log level to \"%s\".\n", level)
-      }
-      setLogLevel(level)
-    }

Review Comment:
   Moving this out means we loose out on initialization messages - we need to 
fix the PR such that this does not happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to