jiangzho commented on code in PR #23:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/23#discussion_r1690622477


##########
spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics.source;
+
+import static 
io.javaoperatorsdk.operator.api.reconciler.Constants.CONTROLLER_NAME;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.Controller;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import 
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
+import 
io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.BaseResource;
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.metrics.source.Source;
+import org.apache.spark.util.Clock;
+import org.apache.spark.util.SystemClock;
+
+@Slf4j
+public class OperatorJosdkMetrics implements Source, Metrics {
+  public static final String FINISHED = "finished";
+  public static final String CLEANUP = "cleanup";
+  public static final String FAILED = "failed";
+  public static final String RETRIES = "retries";
+  private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
+  private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+  private final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
+  private static final String RECONCILIATION = "reconciliation";
+  private static final String RESOURCE = "resource";
+  private static final String EVENT = "event";
+  private static final String SUCCESS = "success";
+  private static final String FAILURE = "failure";
+  private static final String EXCEPTION = "exception";
+  private static final String PREFIX = "operator.sdk";
+  private static final String RECONCILIATIONS = "reconciliations";
+  private static final String RECONCILIATIONS_EXECUTIONS = RECONCILIATIONS + 
".executions";
+  private static final String RECONCILIATIONS_QUEUE_SIZE = RECONCILIATIONS + 
".queue.size";
+  private static final String SIZE = "size";
+
+  private final Clock clock;
+  private final MetricRegistry metricRegistry;
+
+  public OperatorJosdkMetrics() {
+    this.clock = new SystemClock();
+    this.metricRegistry = new MetricRegistry();
+  }
+
+  @Override
+  public String sourceName() {
+    return PREFIX;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+
+  @Override
+  public void controllerRegistered(Controller<? extends HasMetadata> 
controller) {
+    // no-op
+    log.debug("Controller has been registered");
+  }
+
+  @Override
+  public void receivedEvent(Event event, Map<String, Object> metadata) {
+    log.debug("received event {}, metadata {}", event, metadata);
+    if (event instanceof ResourceEvent) {
+      final ResourceAction action = ((ResourceEvent) event).getAction();
+      final Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> resource =
+          getResourceClass(metadata);
+      final Optional<String> namespaceOptional = 
event.getRelatedCustomResourceID().getNamespace();
+      resource.ifPresent(
+          aClass -> getCounter(aClass, action.name().toLowerCase(), RESOURCE, 
EVENT).inc());
+      if (resource.isPresent() && namespaceOptional.isPresent()) {
+        getCounter(
+                resource.get(),
+                namespaceOptional.get(),
+                action.name().toLowerCase(),
+                RESOURCE,
+                EVENT)
+            .inc();
+      }
+    }
+  }
+
+  @Override
+  public <T> T timeControllerExecution(ControllerExecution<T> execution) 
throws Exception {
+    log.debug("Time controller execution");
+    final String name = execution.controllerName();
+    final ResourceID resourceID = execution.resourceID();
+    final Optional<String> namespaceOptional = resourceID.getNamespace();
+    final Map<String, Object> metadata = execution.metadata();
+    final Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> resourceClass 
=
+        getResourceClass(metadata);
+    final String execName = execution.name();
+
+    long startTime = clock.getTimeMillis();
+    try {
+      T result = execution.execute();
+      final String successType = execution.successTypeName(result);
+      if (resourceClass.isPresent()) {
+        getHistogram(resourceClass.get(), name, execName, 
successType).update(toSeconds(startTime));
+        getCounter(resourceClass.get(), name, execName, SUCCESS, 
successType).inc();
+        if (namespaceOptional.isPresent()) {
+          getHistogram(resourceClass.get(), namespaceOptional.get(), name, 
execName, successType)
+              .update(toSeconds(startTime));
+          getCounter(
+                  resourceClass.get(),
+                  namespaceOptional.get(),
+                  name,
+                  execName,
+                  SUCCESS,
+                  successType)
+              .inc();
+        }
+      }
+      return result;
+    } catch (Exception e) {

Review Comment:
   This one was not narrowed down for metrics purpose only (so that it would be 
captured by the metrics). The exception is thrown at the end to ensure it's 
handled. There's a unit test added for this purpose as well.



##########
spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.spark.k8s.operator.metrics.source.OperatorJvmSource;
+import org.apache.spark.metrics.sink.Sink;
+import org.apache.spark.metrics.source.Source;
+
+@Slf4j
+public class MetricsSystem {
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  @Getter private final Set<Sink> sinks;
+  @Getter private final Set<Source> sources;
+  @Getter private final MetricRegistry registry;
+  @Getter private final Properties properties;
+  // PrometheusPullModelHandler is registered by default, metrics exposed via 
http port
+  @Getter private final PrometheusPullModelHandler prometheusPullModelHandler;
+  private final Map<String, SinkProps> sinkPropertiesMap;
+
+  public MetricsSystem() {
+    this(new Properties());
+  }
+
+  public MetricsSystem(Properties properties) {
+    this.sources = new HashSet<>();
+    this.sinks = new HashSet<>();
+    this.registry = new MetricRegistry();
+    this.properties = properties;
+    this.sinkPropertiesMap = 
MetricsSystemFactory.parseSinkProperties(this.properties);
+    // Add default sinks
+    this.prometheusPullModelHandler = new PrometheusPullModelHandler(new 
Properties(), registry);
+    this.sinks.add(prometheusPullModelHandler);
+  }
+
+  public void start() {
+    if (running.get()) {
+      throw new IllegalStateException(
+          "Attempting to start a MetricsSystem that is already running");
+    }
+    running.set(true);
+    registerDefaultSources();
+    registerSinks();
+    sinks.forEach(Sink::start);
+  }
+
+  public void stop() {
+    if (running.get()) {
+      sinks.forEach(Sink::stop);
+      registry.removeMatching(MetricFilter.ALL);
+    } else {
+      log.error("Stopping a MetricsSystem that is not running");

Review Comment:
   As recommended by PMD, we enabled according gradle task to [enforce guard 
log 
statement](https://pmd.github.io/pmd/pmd_rules_java_bestpractices.html#guardlogstatement)
 when associate String creation and manipulation. I think this is not reported 
as no string mani



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to