keith-turner commented on code in PR #5798:
URL: https://github.com/apache/accumulo/pull/5798#discussion_r2289250189


##########
test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java:
##########
@@ -232,6 +258,186 @@ public void confirmMetricsPublished() throws Exception {
     cluster.stop();
   }
 
+  @Test
+  public void testFateExecutorMetrics() throws Exception {
+    // Tests metrics for Fate's thread pools. Tests that metrics are seen as 
expected, and config
+    // changes to the thread pools are accurately reflected in the metrics. 
This includes checking
+    // that old thread pool metrics are removed, new ones are created, size 
changes to thread
+    // pools are reflected, and the ops assigned and instance type tags are 
seen as expected
+    final String table = getUniqueNames(1)[0];
+
+    // prevent any system initiated fate operations from running, which may 
interfere with our
+    // metrics gathering
+    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+    
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    try {
+      try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+        client.tableOperations().create(table);
+
+        SortedSet<Text> splits = new TreeSet<>();
+        splits.add(new Text("foo"));
+        // initiate 1 USER fate op which will execute slowly
+        client.tableOperations().addSplits(table, splits);
+        // initiate 1 META fate op which will execute slowly
+        client.tableOperations().addSplits(SystemTables.METADATA.tableName(), 
splits);
+
+        final AtomicBoolean sawExpectedInactiveUserThreads = new 
AtomicBoolean(false);
+        final AtomicBoolean sawTotalThreadsUserMetric = new 
AtomicBoolean(false);
+        final AtomicBoolean sawExpectedInactiveMetaThreads = new 
AtomicBoolean(false);
+        final AtomicBoolean sawTotalThreadsMetaMetric = new 
AtomicBoolean(false);
+        // should see 1 thread occupied for the pool for each FATE instance:
+        // inactive = configured size - num fate ops initiated
+        // total = configured size
+        Wait.waitFor(() -> {
+          for (var line : sink.getLines()) {
+            TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+            // if the metric is one of the fate executor metrics...
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+                || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              var tags = metric.getTags();
+              var instanceType = FateInstanceType
+                  
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+              var opsAssigned = 
tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+              verifyFateMetricTags(opsAssigned, instanceType);
+
+              if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+                assertEquals(numFateThreadsPool1, 
Integer.parseInt(metric.getValue()));
+                if (instanceType == FateInstanceType.USER) {
+                  sawTotalThreadsUserMetric.set(true);
+                } else if (instanceType == FateInstanceType.META) {
+                  sawTotalThreadsMetaMetric.set(true);
+                }
+              } else if 
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+                if (instanceType == FateInstanceType.USER
+                    && (Integer.parseInt(metric.getValue()) == 
numFateThreadsPool1 - 1)) {
+                  sawExpectedInactiveUserThreads.set(true);
+                } else if (instanceType == FateInstanceType.META
+                    && (Integer.parseInt(metric.getValue()) == 
numFateThreadsPool1 - 1)) {
+                  sawExpectedInactiveMetaThreads.set(true);
+                }
+              }
+            }
+          }
+
+          log.debug("sawExpectedInactiveUserThreads: " + 
sawExpectedInactiveUserThreads.get());
+          log.debug("sawExpectedInactiveMetaThreads: " + 
sawExpectedInactiveMetaThreads.get());
+          log.debug("sawTotalThreadsUserMetric: " + 
sawTotalThreadsUserMetric.get());
+          log.debug("sawTotalThreadsMetaMetric: " + 
sawTotalThreadsMetaMetric.get());
+          return sawExpectedInactiveUserThreads.get() && 
sawExpectedInactiveMetaThreads.get()
+              && sawTotalThreadsUserMetric.get() && 
sawTotalThreadsMetaMetric.get();
+        });
+
+        // Now change the config from:
+        // {<all fate ops>: numFateThreadsPool1}
+        // ->
+        // {<all fate ops except split>: numFateThreadsPool2,
+        // <split operation>: numFateThreadsPool3}
+        changeFateConfig(client, FateInstanceType.USER);
+        changeFateConfig(client, FateInstanceType.META);
+
+        // Wait for config changes to get picked up by FATE. Can do so by 
waiting for metrics to no
+        // longer include metrics of the first pool
+        Wait.waitFor(() -> {
+          boolean metricExistsForPool1 = false;
+          for (var line : sink.getLines()) {
+            TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+            // if the metric is one of the fate executor metrics...
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+                || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              var tags = metric.getTags();
+              var instanceType = FateInstanceType
+                  
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+              var opsAssigned = 
tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+              Set<Fate.FateOperation> fateOpsFromMetric = 
gatherFateOpsFromTag(opsAssigned);
+
+              if (instanceType == FateInstanceType.USER
+                  && 
fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps())) {
+                metricExistsForPool1 = true;
+              } else if (instanceType == FateInstanceType.META
+                  && 
fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) {
+                metricExistsForPool1 = true;
+              }
+            }
+          }
+          log.debug("Metrics still exist for pool 1: " + metricExistsForPool1);
+          return !metricExistsForPool1;
+        }, 60_000);
+
+        // At this point, we should only see metrics for our new pools (old 
metrics should have been
+        // removed)
+        for (var line : sink.getLines()) {
+          TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+          // if the metric is one of the fate executor metrics...
+          if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())

Review Comment:
   This loop may never match anything because metrics arrive periodically.



##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import java.util.Set;
+import java.util.concurrent.TransferQueue;
+
+import org.apache.accumulo.core.metrics.Metric;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class FateExecutorMetrics<T> implements MetricsProducer {
+  private static final Logger log = 
LoggerFactory.getLogger(FateExecutorMetrics.class.getName());
+  private final FateInstanceType type;
+  private final String operatesOn;
+  private final Set<FateExecutor<T>.TransactionRunner> runningTxRunners;
+  private final TransferQueue<FateId> workQueue;
+  private MeterRegistry registry;
+  public static final String INSTANCE_TYPE_TAG_KEY = "instanceType";
+  public static final String OPS_ASSIGNED_TAG_KEY = "ops.assigned";
+
+  protected FateExecutorMetrics(FateInstanceType type, String operatesOn,
+      Set<FateExecutor<T>.TransactionRunner> runningTxRunners, 
TransferQueue<FateId> workQueue) {
+    this.type = type;
+    this.operatesOn = operatesOn;
+    this.runningTxRunners = runningTxRunners;
+    this.workQueue = workQueue;
+  }
+
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    Gauge.builder(Metric.FATE_OPS_THREADS_TOTAL.getName(), 
runningTxRunners::size)
+        .description(Metric.FATE_OPS_THREADS_TOTAL.getDescription())
+        .tag(INSTANCE_TYPE_TAG_KEY, 
type.name().toLowerCase()).tag(OPS_ASSIGNED_TAG_KEY, operatesOn)
+        .register(registry);
+    Gauge.builder(Metric.FATE_OPS_THREADS_INACTIVE.getName(), 
workQueue::getWaitingConsumerCount)
+        .description(Metric.FATE_OPS_THREADS_INACTIVE.getDescription())
+        .tag(INSTANCE_TYPE_TAG_KEY, 
type.name().toLowerCase()).tag(OPS_ASSIGNED_TAG_KEY, operatesOn)
+        .register(registry);
+    this.registry = registry;
+  }
+
+  public void clearMetrics() {

Review Comment:
   Should this method set `registry=null` somewhere?
   
   Instead of setting to null, may be better to have something like  `enum 
{UNREGISTERED, REGISTERED, CLOSED}` to track the current state of things.  Then 
can only register if in the UNREGISTERED state, when using null can not 
differentiate between the UNREGISTERED and CLOSED states.  If we get a call to 
register when in the CLOSED or REGISTERED state then it can be ignored.  If we 
get a call to clear when in the UNREGISTERED or CLOSED state then it can be 
ignored.



##########
test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java:
##########
@@ -232,6 +258,186 @@ public void confirmMetricsPublished() throws Exception {
     cluster.stop();
   }
 
+  @Test
+  public void testFateExecutorMetrics() throws Exception {
+    // Tests metrics for Fate's thread pools. Tests that metrics are seen as 
expected, and config
+    // changes to the thread pools are accurately reflected in the metrics. 
This includes checking
+    // that old thread pool metrics are removed, new ones are created, size 
changes to thread
+    // pools are reflected, and the ops assigned and instance type tags are 
seen as expected
+    final String table = getUniqueNames(1)[0];
+
+    // prevent any system initiated fate operations from running, which may 
interfere with our
+    // metrics gathering
+    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+    
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    try {
+      try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+        client.tableOperations().create(table);
+
+        SortedSet<Text> splits = new TreeSet<>();
+        splits.add(new Text("foo"));
+        // initiate 1 USER fate op which will execute slowly
+        client.tableOperations().addSplits(table, splits);
+        // initiate 1 META fate op which will execute slowly
+        client.tableOperations().addSplits(SystemTables.METADATA.tableName(), 
splits);
+
+        final AtomicBoolean sawExpectedInactiveUserThreads = new 
AtomicBoolean(false);
+        final AtomicBoolean sawTotalThreadsUserMetric = new 
AtomicBoolean(false);
+        final AtomicBoolean sawExpectedInactiveMetaThreads = new 
AtomicBoolean(false);
+        final AtomicBoolean sawTotalThreadsMetaMetric = new 
AtomicBoolean(false);
+        // should see 1 thread occupied for the pool for each FATE instance:
+        // inactive = configured size - num fate ops initiated
+        // total = configured size
+        Wait.waitFor(() -> {
+          for (var line : sink.getLines()) {
+            TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+            // if the metric is one of the fate executor metrics...
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+                || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              var tags = metric.getTags();
+              var instanceType = FateInstanceType
+                  
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+              var opsAssigned = 
tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+              verifyFateMetricTags(opsAssigned, instanceType);
+
+              if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+                assertEquals(numFateThreadsPool1, 
Integer.parseInt(metric.getValue()));
+                if (instanceType == FateInstanceType.USER) {
+                  sawTotalThreadsUserMetric.set(true);
+                } else if (instanceType == FateInstanceType.META) {
+                  sawTotalThreadsMetaMetric.set(true);
+                }
+              } else if 
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+                if (instanceType == FateInstanceType.USER
+                    && (Integer.parseInt(metric.getValue()) == 
numFateThreadsPool1 - 1)) {
+                  sawExpectedInactiveUserThreads.set(true);
+                } else if (instanceType == FateInstanceType.META
+                    && (Integer.parseInt(metric.getValue()) == 
numFateThreadsPool1 - 1)) {
+                  sawExpectedInactiveMetaThreads.set(true);
+                }
+              }
+            }
+          }
+
+          log.debug("sawExpectedInactiveUserThreads: " + 
sawExpectedInactiveUserThreads.get());
+          log.debug("sawExpectedInactiveMetaThreads: " + 
sawExpectedInactiveMetaThreads.get());
+          log.debug("sawTotalThreadsUserMetric: " + 
sawTotalThreadsUserMetric.get());
+          log.debug("sawTotalThreadsMetaMetric: " + 
sawTotalThreadsMetaMetric.get());
+          return sawExpectedInactiveUserThreads.get() && 
sawExpectedInactiveMetaThreads.get()
+              && sawTotalThreadsUserMetric.get() && 
sawTotalThreadsMetaMetric.get();
+        });
+
+        // Now change the config from:
+        // {<all fate ops>: numFateThreadsPool1}
+        // ->
+        // {<all fate ops except split>: numFateThreadsPool2,
+        // <split operation>: numFateThreadsPool3}
+        changeFateConfig(client, FateInstanceType.USER);
+        changeFateConfig(client, FateInstanceType.META);
+
+        // Wait for config changes to get picked up by FATE. Can do so by 
waiting for metrics to no
+        // longer include metrics of the first pool
+        Wait.waitFor(() -> {
+          boolean metricExistsForPool1 = false;
+          for (var line : sink.getLines()) {

Review Comment:
   The way this is looking for the absence of something may not be correct 
because metrics arrive periodically.  So when something is not seen it could be 
because it has not arrived yet.  To know for sure that something is absent may 
need to wait for some multiple of the statsd period and then look at the 
metrics that have built up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to