keith-turner commented on code in PR #5798: URL: https://github.com/apache/accumulo/pull/5798#discussion_r2289250189
########## test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java: ########## @@ -232,6 +258,186 @@ public void confirmMetricsPublished() throws Exception { cluster.stop(); } + @Test + public void testFateExecutorMetrics() throws Exception { + // Tests metrics for Fate's thread pools. Tests that metrics are seen as expected, and config + // changes to the thread pools are accurately reflected in the metrics. This includes checking + // that old thread pool metrics are removed, new ones are created, size changes to thread + // pools are reflected, and the ops assigned and instance type tags are seen as expected + final String table = getUniqueNames(1)[0]; + + // prevent any system initiated fate operations from running, which may interfere with our + // metrics gathering + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + try { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(table); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text("foo")); + // initiate 1 USER fate op which will execute slowly + client.tableOperations().addSplits(table, splits); + // initiate 1 META fate op which will execute slowly + client.tableOperations().addSplits(SystemTables.METADATA.tableName(), splits); + + final AtomicBoolean sawExpectedInactiveUserThreads = new AtomicBoolean(false); + final AtomicBoolean sawTotalThreadsUserMetric = new AtomicBoolean(false); + final AtomicBoolean sawExpectedInactiveMetaThreads = new AtomicBoolean(false); + final AtomicBoolean sawTotalThreadsMetaMetric = new AtomicBoolean(false); + // should see 1 thread occupied for the pool for each FATE instance: + // inactive = configured size - num fate ops initiated + // total = configured size + Wait.waitFor(() -> { + for (var line : sink.getLines()) { + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + // if the metric is one of the fate executor metrics... + if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) + || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + var tags = metric.getTags(); + var instanceType = FateInstanceType + .valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase()); + var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY); + + verifyFateMetricTags(opsAssigned, instanceType); + + if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + assertEquals(numFateThreadsPool1, Integer.parseInt(metric.getValue())); + if (instanceType == FateInstanceType.USER) { + sawTotalThreadsUserMetric.set(true); + } else if (instanceType == FateInstanceType.META) { + sawTotalThreadsMetaMetric.set(true); + } + } else if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) { + if (instanceType == FateInstanceType.USER + && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 1)) { + sawExpectedInactiveUserThreads.set(true); + } else if (instanceType == FateInstanceType.META + && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 1)) { + sawExpectedInactiveMetaThreads.set(true); + } + } + } + } + + log.debug("sawExpectedInactiveUserThreads: " + sawExpectedInactiveUserThreads.get()); + log.debug("sawExpectedInactiveMetaThreads: " + sawExpectedInactiveMetaThreads.get()); + log.debug("sawTotalThreadsUserMetric: " + sawTotalThreadsUserMetric.get()); + log.debug("sawTotalThreadsMetaMetric: " + sawTotalThreadsMetaMetric.get()); + return sawExpectedInactiveUserThreads.get() && sawExpectedInactiveMetaThreads.get() + && sawTotalThreadsUserMetric.get() && sawTotalThreadsMetaMetric.get(); + }); + + // Now change the config from: + // {<all fate ops>: numFateThreadsPool1} + // -> + // {<all fate ops except split>: numFateThreadsPool2, + // <split operation>: numFateThreadsPool3} + changeFateConfig(client, FateInstanceType.USER); + changeFateConfig(client, FateInstanceType.META); + + // Wait for config changes to get picked up by FATE. Can do so by waiting for metrics to no + // longer include metrics of the first pool + Wait.waitFor(() -> { + boolean metricExistsForPool1 = false; + for (var line : sink.getLines()) { + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + // if the metric is one of the fate executor metrics... + if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) + || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + var tags = metric.getTags(); + var instanceType = FateInstanceType + .valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase()); + var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY); + + Set<Fate.FateOperation> fateOpsFromMetric = gatherFateOpsFromTag(opsAssigned); + + if (instanceType == FateInstanceType.USER + && fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps())) { + metricExistsForPool1 = true; + } else if (instanceType == FateInstanceType.META + && fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) { + metricExistsForPool1 = true; + } + } + } + log.debug("Metrics still exist for pool 1: " + metricExistsForPool1); + return !metricExistsForPool1; + }, 60_000); + + // At this point, we should only see metrics for our new pools (old metrics should have been + // removed) + for (var line : sink.getLines()) { + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + // if the metric is one of the fate executor metrics... + if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) Review Comment: This loop may never match anything because metrics arrive periodically. ########## core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.util.Set; +import java.util.concurrent.TransferQueue; + +import org.apache.accumulo.core.metrics.Metric; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +public class FateExecutorMetrics<T> implements MetricsProducer { + private static final Logger log = LoggerFactory.getLogger(FateExecutorMetrics.class.getName()); + private final FateInstanceType type; + private final String operatesOn; + private final Set<FateExecutor<T>.TransactionRunner> runningTxRunners; + private final TransferQueue<FateId> workQueue; + private MeterRegistry registry; + public static final String INSTANCE_TYPE_TAG_KEY = "instanceType"; + public static final String OPS_ASSIGNED_TAG_KEY = "ops.assigned"; + + protected FateExecutorMetrics(FateInstanceType type, String operatesOn, + Set<FateExecutor<T>.TransactionRunner> runningTxRunners, TransferQueue<FateId> workQueue) { + this.type = type; + this.operatesOn = operatesOn; + this.runningTxRunners = runningTxRunners; + this.workQueue = workQueue; + } + + @Override + public void registerMetrics(MeterRegistry registry) { + Gauge.builder(Metric.FATE_OPS_THREADS_TOTAL.getName(), runningTxRunners::size) + .description(Metric.FATE_OPS_THREADS_TOTAL.getDescription()) + .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase()).tag(OPS_ASSIGNED_TAG_KEY, operatesOn) + .register(registry); + Gauge.builder(Metric.FATE_OPS_THREADS_INACTIVE.getName(), workQueue::getWaitingConsumerCount) + .description(Metric.FATE_OPS_THREADS_INACTIVE.getDescription()) + .tag(INSTANCE_TYPE_TAG_KEY, type.name().toLowerCase()).tag(OPS_ASSIGNED_TAG_KEY, operatesOn) + .register(registry); + this.registry = registry; + } + + public void clearMetrics() { Review Comment: Should this method set `registry=null` somewhere? Instead of setting to null, may be better to have something like `enum {UNREGISTERED, REGISTERED, CLOSED}` to track the current state of things. Then can only register if in the UNREGISTERED state, when using null can not differentiate between the UNREGISTERED and CLOSED states. If we get a call to register when in the CLOSED or REGISTERED state then it can be ignored. If we get a call to clear when in the UNREGISTERED or CLOSED state then it can be ignored. ########## test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java: ########## @@ -232,6 +258,186 @@ public void confirmMetricsPublished() throws Exception { cluster.stop(); } + @Test + public void testFateExecutorMetrics() throws Exception { + // Tests metrics for Fate's thread pools. Tests that metrics are seen as expected, and config + // changes to the thread pools are accurately reflected in the metrics. This includes checking + // that old thread pool metrics are removed, new ones are created, size changes to thread + // pools are reflected, and the ops assigned and instance type tags are seen as expected + final String table = getUniqueNames(1)[0]; + + // prevent any system initiated fate operations from running, which may interfere with our + // metrics gathering + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + try { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(table); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text("foo")); + // initiate 1 USER fate op which will execute slowly + client.tableOperations().addSplits(table, splits); + // initiate 1 META fate op which will execute slowly + client.tableOperations().addSplits(SystemTables.METADATA.tableName(), splits); + + final AtomicBoolean sawExpectedInactiveUserThreads = new AtomicBoolean(false); + final AtomicBoolean sawTotalThreadsUserMetric = new AtomicBoolean(false); + final AtomicBoolean sawExpectedInactiveMetaThreads = new AtomicBoolean(false); + final AtomicBoolean sawTotalThreadsMetaMetric = new AtomicBoolean(false); + // should see 1 thread occupied for the pool for each FATE instance: + // inactive = configured size - num fate ops initiated + // total = configured size + Wait.waitFor(() -> { + for (var line : sink.getLines()) { + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + // if the metric is one of the fate executor metrics... + if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) + || metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + var tags = metric.getTags(); + var instanceType = FateInstanceType + .valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase()); + var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY); + + verifyFateMetricTags(opsAssigned, instanceType); + + if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + assertEquals(numFateThreadsPool1, Integer.parseInt(metric.getValue())); + if (instanceType == FateInstanceType.USER) { + sawTotalThreadsUserMetric.set(true); + } else if (instanceType == FateInstanceType.META) { + sawTotalThreadsMetaMetric.set(true); + } + } else if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) { + if (instanceType == FateInstanceType.USER + && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 1)) { + sawExpectedInactiveUserThreads.set(true); + } else if (instanceType == FateInstanceType.META + && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 1)) { + sawExpectedInactiveMetaThreads.set(true); + } + } + } + } + + log.debug("sawExpectedInactiveUserThreads: " + sawExpectedInactiveUserThreads.get()); + log.debug("sawExpectedInactiveMetaThreads: " + sawExpectedInactiveMetaThreads.get()); + log.debug("sawTotalThreadsUserMetric: " + sawTotalThreadsUserMetric.get()); + log.debug("sawTotalThreadsMetaMetric: " + sawTotalThreadsMetaMetric.get()); + return sawExpectedInactiveUserThreads.get() && sawExpectedInactiveMetaThreads.get() + && sawTotalThreadsUserMetric.get() && sawTotalThreadsMetaMetric.get(); + }); + + // Now change the config from: + // {<all fate ops>: numFateThreadsPool1} + // -> + // {<all fate ops except split>: numFateThreadsPool2, + // <split operation>: numFateThreadsPool3} + changeFateConfig(client, FateInstanceType.USER); + changeFateConfig(client, FateInstanceType.META); + + // Wait for config changes to get picked up by FATE. Can do so by waiting for metrics to no + // longer include metrics of the first pool + Wait.waitFor(() -> { + boolean metricExistsForPool1 = false; + for (var line : sink.getLines()) { Review Comment: The way this is looking for the absence of something may not be correct because metrics arrive periodically. So when something is not seen it could be because it has not arrived yet. To know for sure that something is absent may need to wait for some multiple of the statsd period and then look at the metrics that have built up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org