attilapiros commented on code in PR #41746:
URL: https://github.com/apache/spark/pull/41746#discussion_r1258672480


##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -318,6 +318,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         context.reply(true)
         stop()
 
+      case UpdateExecutorsLogLevel(msg) =>
+        logInfo("Asking each executor to refresh")

Review Comment:
   ```suggestion
           logInfo("Asking each executor to refresh the log level to $logLevel")
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -653,6 +660,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
+  override def update(msg: String): Unit = {
+      if (driverEndpoint != null) {
+        logInfo("Refreshing executors")

Review Comment:
   ```suggestion
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:
##########
@@ -49,6 +49,10 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
+  case class UpdateExecutorsLogLevel(msg: String) extends 
CoarseGrainedClusterMessage

Review Comment:
   ```suggestion
     case class UpdateExecutorsLogLevel(logLevel: String) extends 
CoarseGrainedClusterMessage
   ```



##########
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:
##########
@@ -473,6 +476,12 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       }
 
       driverConf.set(EXECUTOR_ID, arguments.executorId)
+
+      if (driverConf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+        // Set the current log level of driver in Executor
+        driverConf.get(SPARK_LOG_LEVEL).foreach(l => updateLogLevel(l))
+

Review Comment:
   ```suggestion
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:
##########
@@ -49,6 +49,10 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
+  case class UpdateExecutorsLogLevel(msg: String) extends 
CoarseGrainedClusterMessage
+
+  case class UpdateExecutorLogLevel(msg: String) extends 
CoarseGrainedClusterMessage

Review Comment:
   ```suggestion
     case class UpdateExecutorLogLevel(logLevel: String) extends 
CoarseGrainedClusterMessage
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -653,6 +660,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
+  override def update(msg: String): Unit = {

Review Comment:
   ```suggestion
     override def updateLogLevel(logLevel: String): Unit = {
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -318,6 +318,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         context.reply(true)
         stop()
 
+      case UpdateExecutorsLogLevel(msg) =>

Review Comment:
   ```suggestion
         case UpdateExecutorsLogLevel(logLevel) =>
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -318,6 +318,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         context.reply(true)
         stop()
 
+      case UpdateExecutorsLogLevel(msg) =>
+        logInfo("Asking each executor to refresh")
+        for ((_, executorData) <- executorDataMap) {
+          executorData.executorEndpoint.send(UpdateExecutorLogLevel(msg))

Review Comment:
   ```suggestion
             
executorData.executorEndpoint.send(UpdateExecutorLogLevel(logLevel))
   ```



##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -387,7 +387,15 @@ class SparkContext(config: SparkConf) extends Logging {
     require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
       s"Supplied level $logLevel did not match one of:" +
         s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
-    Utils.setLogLevel(Level.toLevel(upperCased))
+    // Update only if new log level is not same as current log level
+    if (upperCased != Utils.getLogLevel) {

Review Comment:
   What if "spark.log.level" was not use but the executor log level is 
different from the driver based on the log4 configs? One might try to call 
`sc.setLogLevel` but because of this check the new log level might not reach 
the executor if he calls it with the current driver log level.
   
   So I would move the following part outside of this outer if:
   ```
         if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != 
null) {
           _conf.set(SPARK_LOG_LEVEL.key, upperCased)
           _schedulerBackend.update(upperCased)
         }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to