attilapiros commented on code in PR #41746:
URL: https://github.com/apache/spark/pull/41746#discussion_r1258672480
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -318,6 +318,13 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)
stop()
+ case UpdateExecutorsLogLevel(msg) =>
+ logInfo("Asking each executor to refresh")
Review Comment:
```suggestion
logInfo("Asking each executor to refresh the log level to $logLevel")
```
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -653,6 +660,13 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ override def update(msg: String): Unit = {
+ if (driverEndpoint != null) {
+ logInfo("Refreshing executors")
Review Comment:
```suggestion
```
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:
##########
@@ -49,6 +49,10 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage
+ case class UpdateExecutorsLogLevel(msg: String) extends
CoarseGrainedClusterMessage
Review Comment:
```suggestion
case class UpdateExecutorsLogLevel(logLevel: String) extends
CoarseGrainedClusterMessage
```
##########
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:
##########
@@ -473,6 +476,12 @@ private[spark] object CoarseGrainedExecutorBackend extends
Logging {
}
driverConf.set(EXECUTOR_ID, arguments.executorId)
+
+ if (driverConf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+ // Set the current log level of driver in Executor
+ driverConf.get(SPARK_LOG_LEVEL).foreach(l => updateLogLevel(l))
+
Review Comment:
```suggestion
```
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:
##########
@@ -49,6 +49,10 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage
+ case class UpdateExecutorsLogLevel(msg: String) extends
CoarseGrainedClusterMessage
+
+ case class UpdateExecutorLogLevel(msg: String) extends
CoarseGrainedClusterMessage
Review Comment:
```suggestion
case class UpdateExecutorLogLevel(logLevel: String) extends
CoarseGrainedClusterMessage
```
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -653,6 +660,13 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ override def update(msg: String): Unit = {
Review Comment:
```suggestion
override def updateLogLevel(logLevel: String): Unit = {
```
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -318,6 +318,13 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)
stop()
+ case UpdateExecutorsLogLevel(msg) =>
Review Comment:
```suggestion
case UpdateExecutorsLogLevel(logLevel) =>
```
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -318,6 +318,13 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)
stop()
+ case UpdateExecutorsLogLevel(msg) =>
+ logInfo("Asking each executor to refresh")
+ for ((_, executorData) <- executorDataMap) {
+ executorData.executorEndpoint.send(UpdateExecutorLogLevel(msg))
Review Comment:
```suggestion
executorData.executorEndpoint.send(UpdateExecutorLogLevel(logLevel))
```
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -387,7 +387,15 @@ class SparkContext(config: SparkConf) extends Logging {
require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
s"Supplied level $logLevel did not match one of:" +
s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
- Utils.setLogLevel(Level.toLevel(upperCased))
+ // Update only if new log level is not same as current log level
+ if (upperCased != Utils.getLogLevel) {
Review Comment:
What if "spark.log.level" was not use but the executor log level is
different from the driver based on the log4 configs? One might try to call
`sc.setLogLevel` but because of this check the new log level might not reach
the executor if he calls it with the current driver log level.
So I would move the following part outside of this outer if:
```
if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend !=
null) {
_conf.set(SPARK_LOG_LEVEL.key, upperCased)
_schedulerBackend.update(upperCased)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]