mridulm commented on code in PR #41746:
URL: https://github.com/apache/spark/pull/41746#discussion_r1246921541
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -383,20 +383,22 @@ class SparkContext(config: SparkConf) extends Logging {
def setLogLevel(logLevel: String): Unit = {
// let's allow lowercase or mixed case too
val upperCased = logLevel.toUpperCase(Locale.ROOT)
- require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
+ require(
+ SparkContext.VALID_LOG_LEVELS.contains(upperCased),
Review Comment:
Revert ?
##########
core/src/main/scala/org/apache/spark/util/Utils.scala:
##########
@@ -2464,16 +2464,27 @@ private[spark] object Utils extends Logging with
SparkClassUtils {
* configure a new log4j level
*/
def setLogLevel(l: Level): Unit = {
- val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext]
- val config = ctx.getConfiguration()
- val loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME)
+ val (ctx, loggerConfig) = getLogContext
loggerConfig.setLevel(l)
ctx.updateLoggers()
// Setting threshold to null as rootLevel will define log level for
spark-shell
Logging.sparkShellThresholdLevel = null
}
+ private def getLogContext = {
Review Comment:
Specify the return types.
`lazy val` instead ?
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -638,6 +649,17 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ def refershExecutors(msg: Map[String, String] ): Unit = {
+ try {
+ if (driverEndpoint != null) {
+ logInfo("Refreshing executors")
+ driverEndpoint.askSync[Boolean](RefreshExecutors(msg))
Review Comment:
`ask` should be sufficient, we dont need to block here.
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:
##########
@@ -311,13 +311,24 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
decommissionExecutors(Array((executorId, v._1)), v._2, v._3)
unknownExecutorsPendingDecommission.invalidate(executorId)
})
+ // propagate current log level to new executor only if flag is true
+ if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+ data.executorEndpoint.send(RefreshExecutor(Map("logLevel" ->
Utils.getLogLevel)))
+ }
Review Comment:
Instead, why not set `spark.log.level` in sc.conf when `setLogLevel` changes
the log level ? Executors can initialize based on this when starting, and not
require this rpc call as soon as they start.
##########
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:
##########
@@ -49,6 +49,10 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage
+ case class RefreshExecutors(msg: Map[String, String]) extends
CoarseGrainedClusterMessage
+
+ case class RefreshExecutor(msg: Map[String, String]) extends
CoarseGrainedClusterMessage
Review Comment:
This is a very generic message - we can be more opinionated about what we
are trying to do here.
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -383,20 +383,22 @@ class SparkContext(config: SparkConf) extends Logging {
def setLogLevel(logLevel: String): Unit = {
// let's allow lowercase or mixed case too
val upperCased = logLevel.toUpperCase(Locale.ROOT)
- require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
+ require(
+ SparkContext.VALID_LOG_LEVELS.contains(upperCased),
s"Supplied level $logLevel did not match one of:" +
s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
- Utils.setLogLevel(Level.toLevel(upperCased))
+ // Update only if new log level is not same as current log level
+ if (upperCased != Utils.getLogLevel) {
+ Utils.setLogLevel(Level.toLevel(upperCased))
+ // Inform all executors about the change
+ if (config.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+ _schedulerBackend.refresh(immutable.Map("logLevel" -> upperCased))
+ }
+ }
}
try {
_conf = config.clone()
- _conf.get(SPARK_LOG_LEVEL).foreach { level =>
- if (Logging.setLogLevelPrinted) {
- System.err.printf("Setting Spark log level to \"%s\".\n", level)
- }
- setLogLevel(level)
- }
Review Comment:
Moving this out means we loose out on initialization messages - we need to
fix the PR such that this does not happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]