kevinrr888 commented on code in PR #5798: URL: https://github.com/apache/accumulo/pull/5798#discussion_r2304918755
########## test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java: ########## @@ -232,6 +256,223 @@ public void confirmMetricsPublished() throws Exception { cluster.stop(); } + @Test + public void testFateExecutorMetrics() throws Exception { + // Tests metrics for Fate's thread pools. Tests that metrics are seen as expected, and config + // changes to the thread pools are accurately reflected in the metrics. This includes checking + // that old thread pool metrics are removed, new ones are created, size changes to thread + // pools are reflected, and the ops assigned and instance type tags are seen as expected + final String table = getUniqueNames(1)[0]; + + // prevent any system initiated fate operations from running, which may interfere with our + // metrics gathering + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + try { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(table); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text("foo")); + // initiate 1 USER fate op which will execute slowly + client.tableOperations().addSplits(table, splits); + // initiate 1 META fate op which will execute slowly + client.tableOperations().addSplits(SystemTables.METADATA.tableName(), splits); + + // let metrics build up + Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() * 3); + + boolean sawExpectedTotalThreadsUserMetric = false; + boolean sawExpectedTotalThreadsMetaMetric = false; + boolean sawExpectedInactiveUserThreads = false; + boolean sawExpectedInactiveMetaThreads = false; + // For each FATE instance, should see at least one metric for the following: + // inactive = configured size - num fate ops initiated = configured size - 1 + // total = configured size + for (var line : sink.getLines()) { + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + // if the metric is not one of the fate executor metrics... + if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) + && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + continue; + } + var tags = metric.getTags(); + var instanceType = FateInstanceType + .valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase()); + var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY); + + verifyFateMetricTags(opsAssigned, instanceType); + + if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName()) + && numFateThreadsPool1 == Integer.parseInt(metric.getValue())) { + if (instanceType == FateInstanceType.USER) { + sawExpectedTotalThreadsUserMetric = true; + } else if (instanceType == FateInstanceType.META) { + sawExpectedTotalThreadsMetaMetric = true; + } + } else if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) + && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 1)) { + if (instanceType == FateInstanceType.USER) { + sawExpectedInactiveUserThreads = true; + } else if (instanceType == FateInstanceType.META) { + sawExpectedInactiveMetaThreads = true; + } + } + } + assertTrue(sawExpectedInactiveUserThreads); + assertTrue(sawExpectedInactiveMetaThreads); + assertTrue(sawExpectedTotalThreadsUserMetric); + assertTrue(sawExpectedTotalThreadsMetaMetric); + + // Now change the config from: + // {<all fate ops>: numFateThreadsPool1} + // -> + // {<all fate ops except split>: numFateThreadsPool2, + // <split operation>: numFateThreadsPool3} + changeFateConfig(client, FateInstanceType.USER); + changeFateConfig(client, FateInstanceType.META); + + // Allow FATE config changes to be picked up. Will take at most POOL_WATCHER_DELAY to + // commence, provide a buffer to allow to complete. + Thread.sleep(Fate.POOL_WATCHER_DELAY.toMillis() + 5_000); + // sink metrics, expect from this point onward that we will no longer see metrics for the + // old pool (pool1), only metrics for new pools (pool2 and pool3) + sink.getLines(); + + // let metrics build back up + Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() * 3); + + boolean sawAnyMetricPool1 = false; + + boolean sawExpectedTotalThreadsUserMetricPool2 = false; + boolean sawExpectedTotalThreadsMetaMetricPool2 = false; + boolean sawExpectedInactiveUserThreadsPool2 = false; + boolean sawExpectedInactiveMetaThreadsPool2 = false; + + boolean sawExpectedTotalThreadsUserMetricPool3 = false; + boolean sawExpectedTotalThreadsMetaMetricPool3 = false; + boolean sawExpectedInactiveUserThreadsPool3 = false; + boolean sawExpectedInactiveMetaThreadsPool3 = false; + for (var line : sink.getLines()) { + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + // if the metric is not one of the fate executor metrics... + if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName()) + && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + continue; + } + var tags = metric.getTags(); + var instanceType = FateInstanceType + .valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase()); + var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY); + + verifyFateMetricTags(opsAssigned, instanceType); + + Set<Fate.FateOperation> fateOpsFromMetric = gatherFateOpsFromTag(opsAssigned); + + if (fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps()) + || fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) { + sawAnyMetricPool1 = true; + } else if (fateOpsFromMetric.equals(Arrays.stream(Fate.FateOperation.values()) + .filter(fo -> !fo.equals(SlowFateSplitManager.SLOW_OP)).collect(Collectors.toSet())) + && numFateThreadsPool2 == Integer.parseInt(metric.getValue())) { + // pool2 + // total = inactive = size pool2 + if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) { + if (instanceType == FateInstanceType.USER) { + sawExpectedInactiveUserThreadsPool2 = true; + } else if (instanceType == FateInstanceType.META) { + sawExpectedInactiveMetaThreadsPool2 = true; + } + } else if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + if (instanceType == FateInstanceType.USER) { + sawExpectedTotalThreadsUserMetricPool2 = true; + } else if (instanceType == FateInstanceType.META) { + sawExpectedTotalThreadsMetaMetricPool2 = true; + } + } + } else if (fateOpsFromMetric.equals(Set.of(Fate.FateOperation.TABLE_SPLIT)) + && numFateThreadsPool3 == Integer.parseInt(metric.getValue())) { + // pool3 + // total = inactive = size pool3 + if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) { + if (instanceType == FateInstanceType.USER) { + sawExpectedInactiveUserThreadsPool3 = true; + } else if (instanceType == FateInstanceType.META) { + sawExpectedInactiveMetaThreadsPool3 = true; + } + } else if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) { + if (instanceType == FateInstanceType.USER) { + sawExpectedTotalThreadsUserMetricPool3 = true; + } else if (instanceType == FateInstanceType.META) { + sawExpectedTotalThreadsMetaMetricPool3 = true; + } + } + } Review Comment: I think in this specific case (if you're talking about line 410), test should automatically fail. I can add this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org