csviri commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1562687820
########## spark-operator/src/main/java/org/apache/spark/kubernetes/operator/health/SentinelManager.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kubernetes.operator.health; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.builder.ToStringBuilder; + +import org.apache.spark.kubernetes.operator.BaseResource; +import org.apache.spark.kubernetes.operator.config.SparkOperatorConf; +import org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils; + +import static org.apache.spark.kubernetes.operator.Constants.SENTINEL_LABEL; +import static org.apache.spark.kubernetes.operator.Constants.SPARK_CONF_SENTINEL_DUMMY_FIELD; + +/** + * Sentinel manager monitors dedicated sentinel resources to make sure the operator is healthy + * + * @param <CR> custom resource type + */ +@RequiredArgsConstructor +@Slf4j +public class SentinelManager<CR extends BaseResource<?, ?, ?, ?, ?>> { + + private final ConcurrentHashMap<ResourceID, SentinelResourceState> sentinelResources = + new ConcurrentHashMap<>(); + + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool( + SparkOperatorConf.SentinelExecutorServicePoolSize.getValue()); + + public static boolean isSentinelResource(HasMetadata resource) { + var labels = resource.getMetadata().getLabels(); + if (labels == null) { Review Comment: Labels are never `null` on `metadata` ########## spark-operator/src/main/java/org/apache/spark/kubernetes/operator/reconciler/SparkApplicationReconciler.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.spark.kubernetes.operator.reconciler; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.Pod; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.spark.kubernetes.operator.Constants; +import org.apache.spark.kubernetes.operator.SparkApplication; +import org.apache.spark.kubernetes.operator.controller.SparkApplicationContext; +import org.apache.spark.kubernetes.operator.health.SentinelManager; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverReadyObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverRunningObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverStartObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverTimeoutObserver; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppCleanUpStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppInitStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppReconcileStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppResourceObserveStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppRunningStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppTerminatedStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppValidateStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.UnknownStateStep; +import org.apache.spark.kubernetes.operator.utils.ApplicationStatusUtils; +import org.apache.spark.kubernetes.operator.utils.LoggingUtils; +import org.apache.spark.kubernetes.operator.utils.StatusRecorder; + +import static org.apache.spark.kubernetes.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.commonResourceLabelsStr; + +/** + * Reconciler for Spark Application. + * Performs sanity check on the app, identify the reconcile steps based on App status + * and execute the steps. + */ +@ControllerConfiguration +@Slf4j +@RequiredArgsConstructor +public class SparkApplicationReconciler + implements Reconciler<SparkApplication>, + ErrorStatusHandler<SparkApplication>, + EventSourceInitializer<SparkApplication>, + Cleaner<SparkApplication> { + private final StatusRecorder statusRecorder; + private final SentinelManager<SparkApplication> sentinelManager; + + @Override + public UpdateControl<SparkApplication> reconcile(SparkApplication sparkApplication, + Context<SparkApplication> context) + throws Exception { + LoggingUtils.TrackedMDC trackedMDC = new LoggingUtils.TrackedMDC(); + try { + trackedMDC.set(sparkApplication); Review Comment: Not sure if this is to workaround to some problem with the values added by default, but MDC is already filled with those values see docs: https://javaoperatorsdk.io/docs/features#contextual-info-for-logging-with-mdc ########## spark-operator/src/main/java/org/apache/spark/kubernetes/operator/utils/StatusRecorder.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.spark.kubernetes.operator.utils; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import org.apache.spark.kubernetes.operator.SparkApplication; +import org.apache.spark.kubernetes.operator.controller.SparkApplicationContext; +import org.apache.spark.kubernetes.operator.listeners.ApplicationStatusListener; +import org.apache.spark.kubernetes.operator.status.ApplicationStatus; + +import static org.apache.spark.kubernetes.operator.config.SparkOperatorConf.StatusPatchFailureBackoffSeconds; +import static org.apache.spark.kubernetes.operator.config.SparkOperatorConf.StatusPatchMaxRetry; + +/** + * <pre> + * Note - this is inspired by + * <a href="https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java">Flink Operator Status Recorder</a> + * </pre> + * Enables additional (extendable) observers for Spark App status. + * Cache & version locking might be removed in future version as batch app does not expect + * spec change after submitted. + */ +@Slf4j +public class StatusRecorder { + protected final List<ApplicationStatusListener> appStatusListeners; + protected final ObjectMapper objectMapper = new ObjectMapper(); + protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache; + + public StatusRecorder(List<ApplicationStatusListener> appStatusListeners) { + this.appStatusListeners = appStatusListeners; + this.statusCache = new ConcurrentHashMap<>(); + } + + /** + * Update the status of the provided kubernetes resource on the k8s cluster. We use patch + * together with null resourceVersion to try to guarantee that the status update succeeds even + * if the underlying resource spec was update in the meantime. This is necessary for the correct + * operator behavior. + * + * @param resource Resource for which status update should be performed + */ + @SneakyThrows + private void patchAndCacheStatus(SparkApplication resource, KubernetesClient client) { + ObjectNode newStatusNode = + objectMapper.convertValue(resource.getStatus(), ObjectNode.class); + ResourceID resourceId = ResourceID.fromResource(resource); + ObjectNode previousStatusNode = statusCache.get(resourceId); + + if (newStatusNode.equals(previousStatusNode)) { + log.debug("No status change."); + return; + } + + ApplicationStatus prevStatus = + objectMapper.convertValue(previousStatusNode, ApplicationStatus.class); + + Exception err = null; + for (long i = 0; i < StatusPatchMaxRetry.getValue(); i++) { + // We retry the status update 3 times to avoid some intermittent connectivity errors + try { + replaceStatus(resource, prevStatus, client); + err = null; + } catch (KubernetesClientException e) { + log.error("Error while patching status, retrying {}/3...", (i + 1), e); + Thread.sleep( + TimeUnit.SECONDS.toMillis(StatusPatchFailureBackoffSeconds.getValue())); + err = e; + } + } + + if (err != null) { + throw err; + } + + statusCache.put(resourceId, newStatusNode); + appStatusListeners.forEach(listener -> { + listener.listenStatus(resource, prevStatus, resource.getStatus()); + }); + } + + public void persistStatus(SparkApplicationContext context, + ApplicationStatus newStatus) { + context.getSparkApplication().setStatus(newStatus); + patchAndCacheStatus(context.getSparkApplication(), context.getClient()); + } + + private void replaceStatus(SparkApplication resource, ApplicationStatus prevStatus, + KubernetesClient client) + throws JsonProcessingException { + int retries = 0; + while (true) { + try { + var updated = client.resource(resource).lockResourceVersion().updateStatus(); Review Comment: Just a very generic note about this, you might be already aware. There is no consensus operators where to store state. An other approach would be to add it to a `config map`/`secreate`/`custom resource`. In dependent resources we took rather this approach since that we were able to do in a loosely coupled way: https://javaoperatorsdk.io/docs/dependent-resources#external-state-tracking-dependent-resources ########## spark-operator/src/main/java/org/apache/spark/kubernetes/operator/reconciler/SparkApplicationReconciler.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.spark.kubernetes.operator.reconciler; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.Pod; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.spark.kubernetes.operator.Constants; +import org.apache.spark.kubernetes.operator.SparkApplication; +import org.apache.spark.kubernetes.operator.controller.SparkApplicationContext; +import org.apache.spark.kubernetes.operator.health.SentinelManager; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverReadyObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverRunningObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverStartObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverTimeoutObserver; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppCleanUpStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppInitStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppReconcileStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppResourceObserveStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppRunningStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppTerminatedStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppValidateStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.UnknownStateStep; +import org.apache.spark.kubernetes.operator.utils.ApplicationStatusUtils; +import org.apache.spark.kubernetes.operator.utils.LoggingUtils; +import org.apache.spark.kubernetes.operator.utils.StatusRecorder; + +import static org.apache.spark.kubernetes.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.commonResourceLabelsStr; + +/** + * Reconciler for Spark Application. + * Performs sanity check on the app, identify the reconcile steps based on App status + * and execute the steps. + */ +@ControllerConfiguration +@Slf4j +@RequiredArgsConstructor +public class SparkApplicationReconciler + implements Reconciler<SparkApplication>, + ErrorStatusHandler<SparkApplication>, + EventSourceInitializer<SparkApplication>, + Cleaner<SparkApplication> { + private final StatusRecorder statusRecorder; + private final SentinelManager<SparkApplication> sentinelManager; + + @Override + public UpdateControl<SparkApplication> reconcile(SparkApplication sparkApplication, + Context<SparkApplication> context) + throws Exception { + LoggingUtils.TrackedMDC trackedMDC = new LoggingUtils.TrackedMDC(); + try { + trackedMDC.set(sparkApplication); + if (sentinelManager.handleSentinelResourceReconciliation(sparkApplication, Review Comment: We discussed briefly sentinels/canaries in @gyfora. To my understanding at least one aspects that is handles is there no problem with informer watches (issues fabric8 had before ), those should not be the problem anymore, especially not with Kubernetes version >= 1.25. Such validations of caches we could add directly to Java Operator SDK. I would be also interested in other use cases of this features, and how much issues it catches after the fabric8 client informer fixes. ########## spark-operator/src/main/java/org/apache/spark/kubernetes/operator/reconciler/SparkApplicationReconciler.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.spark.kubernetes.operator.reconciler; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.Pod; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.spark.kubernetes.operator.Constants; +import org.apache.spark.kubernetes.operator.SparkApplication; +import org.apache.spark.kubernetes.operator.controller.SparkApplicationContext; +import org.apache.spark.kubernetes.operator.health.SentinelManager; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverReadyObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverRunningObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverStartObserver; +import org.apache.spark.kubernetes.operator.reconciler.observers.AppDriverTimeoutObserver; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppCleanUpStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppInitStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppReconcileStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppResourceObserveStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppRunningStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppTerminatedStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.AppValidateStep; +import org.apache.spark.kubernetes.operator.reconciler.reconcilesteps.UnknownStateStep; +import org.apache.spark.kubernetes.operator.utils.ApplicationStatusUtils; +import org.apache.spark.kubernetes.operator.utils.LoggingUtils; +import org.apache.spark.kubernetes.operator.utils.StatusRecorder; + +import static org.apache.spark.kubernetes.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.kubernetes.operator.reconciler.SparkReconcilerUtils.commonResourceLabelsStr; + +/** + * Reconciler for Spark Application. + * Performs sanity check on the app, identify the reconcile steps based on App status + * and execute the steps. + */ +@ControllerConfiguration +@Slf4j +@RequiredArgsConstructor +public class SparkApplicationReconciler + implements Reconciler<SparkApplication>, + ErrorStatusHandler<SparkApplication>, + EventSourceInitializer<SparkApplication>, + Cleaner<SparkApplication> { + private final StatusRecorder statusRecorder; + private final SentinelManager<SparkApplication> sentinelManager; + + @Override + public UpdateControl<SparkApplication> reconcile(SparkApplication sparkApplication, + Context<SparkApplication> context) + throws Exception { + LoggingUtils.TrackedMDC trackedMDC = new LoggingUtils.TrackedMDC(); + try { + trackedMDC.set(sparkApplication); + if (sentinelManager.handleSentinelResourceReconciliation(sparkApplication, + context.getClient())) { + return UpdateControl.noUpdate(); + } + log.debug("Start reconciliation."); + statusRecorder.updateStatusFromCache(sparkApplication); + SparkApplicationContext ctx = new SparkApplicationContext(sparkApplication, context); + List<AppReconcileStep> reconcileSteps = getReconcileSteps(sparkApplication); Review Comment: This is very interesting with the step abstraction. Just one note one this, I did not dive too deep into this but, JOSDK has an abstraction descript resources reconciliations and and workflows on top of that, might be interesting to see if these steps could be fitted in that model, that might spare you some code. Also it mainly provides lot's of optimizations regarding reconciliation cycles, see: https://javaoperatorsdk.io/docs/dependent-resources https://javaoperatorsdk.io/docs/workflows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
