dongjoon-hyun commented on code in PR #707:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/707#discussion_r3453424912


##########
spark-operator/src/main/java/org/apache/spark/k8s/operator/config/DynamicConfigMonitor.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.config;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Periodically reloads dynamic configuration overrides from a properties file 
on disk. The file is
+ * expected to be populated by mounting a ConfigMap as a volume into the 
operator pod, so changes
+ * applied to the ConfigMap propagate to disk without requiring a Kubernetes 
informer. When the
+ * file contents change, {@link SparkOperatorConfManager} is refreshed and the 
caller-supplied
+ * namespace updater is invoked.
+ */
+@Slf4j
+public class DynamicConfigMonitor {
+
+  private final Path configFile;
+  private final Duration reloadInterval;
+  private final Supplier<Set<String>> watchedNamespacesSupplier;
+  private final Consumer<Set<String>> namespaceUpdater;
+  private final ScheduledExecutorService scheduler;
+  private final boolean ownsScheduler;
+
+  private final AtomicReference<Map<String, String>> lastLoaded = new 
AtomicReference<>(Map.of());
+  private final AtomicBoolean initialLoadComplete = new AtomicBoolean();
+
+  public DynamicConfigMonitor(
+      Path configFile,
+      Duration reloadInterval,
+      Supplier<Set<String>> watchedNamespacesSupplier,
+      Consumer<Set<String>> namespaceUpdater) {
+    this(configFile, reloadInterval, watchedNamespacesSupplier, 
namespaceUpdater, null);
+  }
+
+  DynamicConfigMonitor(
+      Path configFile,
+      Duration reloadInterval,
+      Supplier<Set<String>> watchedNamespacesSupplier,
+      Consumer<Set<String>> namespaceUpdater,
+      ScheduledExecutorService scheduler) {
+    this.configFile = configFile;
+    this.reloadInterval = reloadInterval;
+    this.watchedNamespacesSupplier = watchedNamespacesSupplier;
+    this.namespaceUpdater = namespaceUpdater;
+    if (scheduler == null) {
+      this.scheduler =
+          Executors.newSingleThreadScheduledExecutor(
+              r -> {
+                Thread t = new Thread(r, "spark-operator-dynamic-config");
+                t.setDaemon(true);
+                return t;
+              });
+      this.ownsScheduler = true;
+    } else {
+      this.scheduler = scheduler;
+      this.ownsScheduler = false;
+    }
+  }
+
+  /**
+   * Performs an initial synchronous load and schedules periodic reloads at 
the configured
+   * interval.
+   */
+  public void start() {
+    log.info(
+        "Starting dynamic config monitor on {} with reload interval {}",
+        configFile,
+        reloadInterval);
+    reload();
+    initialLoadComplete.set(true);
+    long millis = reloadInterval.toMillis();
+    scheduler.scheduleAtFixedRate(this::reloadSafely, millis, millis, 
TimeUnit.MILLISECONDS);
+  }
+
+  /** Stops the scheduler if it was created internally. */
+  public void stop() {
+    log.info("Stopping dynamic config monitor");
+    if (ownsScheduler) {
+      scheduler.shutdownNow();
+    }
+  }
+
+  /**
+   * Returns true once the initial load has completed and the underlying 
scheduler is still
+   * running.
+   */
+  public boolean isRunning() {
+    return initialLoadComplete.get() && !scheduler.isShutdown();
+  }
+
+  private void reloadSafely() {
+    try {
+      reload();
+    } catch (UncheckedIOException e) {
+      log.error("Failed to reload dynamic config from {}", configFile, e);
+    }
+  }
+
+  private void reload() {
+    Map<String, String> current = readProperties();
+    if (current.equals(lastLoaded.get())) {
+      return;
+    }
+    log.info(
+        "Detected dynamic config change in {}, applying {} overrides", 
configFile, current.size());
+    SparkOperatorConfManager.INSTANCE.refresh(current);
+    lastLoaded.set(current);
+    namespaceUpdater.accept(watchedNamespacesSupplier.get());

Review Comment:
   This seems to break `reload`'s assumption. This means we will not retry when 
`namespaceUpdater.accept(watchedNamespacesSupplier.get())` fails, doesn't it?



##########
spark-operator/src/main/java/org/apache/spark/k8s/operator/config/DynamicConfigMonitor.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.config;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Periodically reloads dynamic configuration overrides from a properties file 
on disk. The file is
+ * expected to be populated by mounting a ConfigMap as a volume into the 
operator pod, so changes
+ * applied to the ConfigMap propagate to disk without requiring a Kubernetes 
informer. When the
+ * file contents change, {@link SparkOperatorConfManager} is refreshed and the 
caller-supplied
+ * namespace updater is invoked.
+ */
+@Slf4j
+public class DynamicConfigMonitor {
+
+  private final Path configFile;
+  private final Duration reloadInterval;
+  private final Supplier<Set<String>> watchedNamespacesSupplier;
+  private final Consumer<Set<String>> namespaceUpdater;
+  private final ScheduledExecutorService scheduler;
+  private final boolean ownsScheduler;
+
+  private final AtomicReference<Map<String, String>> lastLoaded = new 
AtomicReference<>(Map.of());
+  private final AtomicBoolean initialLoadComplete = new AtomicBoolean();
+
+  public DynamicConfigMonitor(
+      Path configFile,
+      Duration reloadInterval,
+      Supplier<Set<String>> watchedNamespacesSupplier,
+      Consumer<Set<String>> namespaceUpdater) {
+    this(configFile, reloadInterval, watchedNamespacesSupplier, 
namespaceUpdater, null);
+  }
+
+  DynamicConfigMonitor(
+      Path configFile,
+      Duration reloadInterval,
+      Supplier<Set<String>> watchedNamespacesSupplier,
+      Consumer<Set<String>> namespaceUpdater,
+      ScheduledExecutorService scheduler) {
+    this.configFile = configFile;
+    this.reloadInterval = reloadInterval;
+    this.watchedNamespacesSupplier = watchedNamespacesSupplier;
+    this.namespaceUpdater = namespaceUpdater;
+    if (scheduler == null) {
+      this.scheduler =
+          Executors.newSingleThreadScheduledExecutor(
+              r -> {
+                Thread t = new Thread(r, "spark-operator-dynamic-config");
+                t.setDaemon(true);
+                return t;
+              });
+      this.ownsScheduler = true;
+    } else {
+      this.scheduler = scheduler;
+      this.ownsScheduler = false;
+    }
+  }
+
+  /**
+   * Performs an initial synchronous load and schedules periodic reloads at 
the configured
+   * interval.
+   */
+  public void start() {
+    log.info(
+        "Starting dynamic config monitor on {} with reload interval {}",
+        configFile,
+        reloadInterval);
+    reload();
+    initialLoadComplete.set(true);
+    long millis = reloadInterval.toMillis();
+    scheduler.scheduleAtFixedRate(this::reloadSafely, millis, millis, 
TimeUnit.MILLISECONDS);
+  }
+
+  /** Stops the scheduler if it was created internally. */
+  public void stop() {
+    log.info("Stopping dynamic config monitor");
+    if (ownsScheduler) {
+      scheduler.shutdownNow();
+    }
+  }
+
+  /**
+   * Returns true once the initial load has completed and the underlying 
scheduler is still
+   * running.
+   */
+  public boolean isRunning() {
+    return initialLoadComplete.get() && !scheduler.isShutdown();
+  }
+
+  private void reloadSafely() {
+    try {
+      reload();
+    } catch (UncheckedIOException e) {

Review Comment:
   This looks unsafe to me. For example,
   - `Properties.load` can throw `IllegalArgumentException` which is not 
`UncheckedIOException`. 
   - `namespaceUpdater.accept(watchedNamespacesSupplier.get())` can throw 
`RuntimeException` which is not `UncheckedIOException`.
   
   Please double-check the code patch and make this robust.



##########
spark-operator/src/main/java/org/apache/spark/k8s/operator/probe/HealthProbe.java:
##########
@@ -60,7 +62,6 @@ public boolean isHealthy() {
     if (!areOperatorsStarted(operators).orElse(false)) {
       return false;
     }
-

Review Comment:
   nit. Please don't mix a style-change into this PR. Simply, recover the 
original empty line and reduce the footprint (size) of this PR.



##########
spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java:
##########
@@ -235,32 +235,71 @@ public final class SparkOperatorConf {
           .build();
 
   /**
-   * When enabled, operator would use config map as source of truth for config 
property override.
-   * The config map need to be created in spark.kubernetes.operator.namespace, 
and labeled with
-   * operator name.
+   * When enabled, operator would load config property overrides dynamically 
at runtime. The source
+   * of the overrides is controlled by 
spark.kubernetes.operator.dynamicConfig.source.
    */
   public static final ConfigOption<Boolean> DYNAMIC_CONFIG_ENABLED =
       ConfigOption.<Boolean>builder()
           .key("spark.kubernetes.operator.dynamicConfig.enabled")
           .enableDynamicOverride(false)
           .description(
-              "When enabled, operator would use config map as source of truth 
for config "
-                  + "property override. The config map need to be created in "
-                  + "spark.kubernetes.operator.namespace, and labeled with 
operator name.")
+              "When enabled, operator would load config property overrides 
dynamically at "
+                  + "runtime. The source of the overrides is controlled by "
+                  + "spark.kubernetes.operator.dynamicConfig.source.")
           .typeParameterClass(Boolean.class)
           .defaultValue(false)
           .build();
 
-  /** The selector str applied to dynamic config map. */
+  /** Source of dynamic config overrides: {@code configMap} (informer) or 
{@code file} (mount). */
+  public static final ConfigOption<String> DYNAMIC_CONFIG_SOURCE =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.dynamicConfig.source")
+          .enableDynamicOverride(false)
+          .description(
+              "Source of dynamic config overrides when "
+                  + "spark.kubernetes.operator.dynamicConfig.enabled is true. 
Supported values: "
+                  + "'configMap' (default) watches a ConfigMap via a 
Kubernetes informer and "
+                  + "requires RBAC to read ConfigMaps; 'file' periodically 
reloads a properties "
+                  + "file mounted from a ConfigMap and requires no extra 
RBAC.")
+          .typeParameterClass(String.class)
+          .defaultValue("configMap")
+          .build();
+
+  /** The selector str applied to dynamic config map (used by the {@code 
configMap} source). */
   public static final ConfigOption<String> DYNAMIC_CONFIG_SELECTOR =
       ConfigOption.<String>builder()
           .key("spark.kubernetes.operator.dynamicConfig.selector")
           .enableDynamicOverride(false)
-          .description("The selector str applied to dynamic config map.")
+          .description(
+              "The selector str applied to dynamic config map. Used by the 
'configMap' source.")
           .typeParameterClass(String.class)
           .defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels()))
           .build();
 
+  /** Path of the properties file that holds dynamic config overrides. */
+  public static final ConfigOption<String> DYNAMIC_CONFIG_FILE_PATH =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.dynamicConfig.filePath")
+          .enableDynamicOverride(false)
+          .description(
+              "Path of the properties file holding dynamic configuration 
overrides. Used by the "
+                  + "'file' source, typically populated by mounting a 
ConfigMap as a volume.")
+          .typeParameterClass(String.class)
+          
.defaultValue("/opt/spark-operator/dynamic-conf/spark-operator-dynamic.properties")
+          .build();
+
+  /** Interval at which the dynamic config file is re-read (used by the {@code 
file} source). */
+  public static final ConfigOption<Long> 
DYNAMIC_CONFIG_RELOAD_INTERVAL_SECONDS =
+      ConfigOption.<Long>builder()
+          .key("spark.kubernetes.operator.dynamicConfig.reloadIntervalSeconds")

Review Comment:
   We need to accept only positive value. For 0 or negative, `operator` will 
fail due to `IllegalArgumentException` at start-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to