kevinrr888 commented on code in PR #5798:
URL: https://github.com/apache/accumulo/pull/5798#discussion_r2304918755


##########
test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java:
##########
@@ -232,6 +256,223 @@ public void confirmMetricsPublished() throws Exception {
     cluster.stop();
   }
 
+  @Test
+  public void testFateExecutorMetrics() throws Exception {
+    // Tests metrics for Fate's thread pools. Tests that metrics are seen as 
expected, and config
+    // changes to the thread pools are accurately reflected in the metrics. 
This includes checking
+    // that old thread pool metrics are removed, new ones are created, size 
changes to thread
+    // pools are reflected, and the ops assigned and instance type tags are 
seen as expected
+    final String table = getUniqueNames(1)[0];
+
+    // prevent any system initiated fate operations from running, which may 
interfere with our
+    // metrics gathering
+    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+    
getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    try {
+      try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+        client.tableOperations().create(table);
+
+        SortedSet<Text> splits = new TreeSet<>();
+        splits.add(new Text("foo"));
+        // initiate 1 USER fate op which will execute slowly
+        client.tableOperations().addSplits(table, splits);
+        // initiate 1 META fate op which will execute slowly
+        client.tableOperations().addSplits(SystemTables.METADATA.tableName(), 
splits);
+
+        // let metrics build up
+        Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() * 
3);
+
+        boolean sawExpectedTotalThreadsUserMetric = false;
+        boolean sawExpectedTotalThreadsMetaMetric = false;
+        boolean sawExpectedInactiveUserThreads = false;
+        boolean sawExpectedInactiveMetaThreads = false;
+        // For each FATE instance, should see at least one metric for the 
following:
+        // inactive = configured size - num fate ops initiated = configured 
size - 1
+        // total = configured size
+        for (var line : sink.getLines()) {
+          TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+          // if the metric is not one of the fate executor metrics...
+          if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+              && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+            continue;
+          }
+          var tags = metric.getTags();
+          var instanceType = FateInstanceType
+              
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+          var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+          verifyFateMetricTags(opsAssigned, instanceType);
+
+          if (metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())
+              && numFateThreadsPool1 == Integer.parseInt(metric.getValue())) {
+            if (instanceType == FateInstanceType.USER) {
+              sawExpectedTotalThreadsUserMetric = true;
+            } else if (instanceType == FateInstanceType.META) {
+              sawExpectedTotalThreadsMetaMetric = true;
+            }
+          } else if 
(metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+              && (Integer.parseInt(metric.getValue()) == numFateThreadsPool1 - 
1)) {
+            if (instanceType == FateInstanceType.USER) {
+              sawExpectedInactiveUserThreads = true;
+            } else if (instanceType == FateInstanceType.META) {
+              sawExpectedInactiveMetaThreads = true;
+            }
+          }
+        }
+        assertTrue(sawExpectedInactiveUserThreads);
+        assertTrue(sawExpectedInactiveMetaThreads);
+        assertTrue(sawExpectedTotalThreadsUserMetric);
+        assertTrue(sawExpectedTotalThreadsMetaMetric);
+
+        // Now change the config from:
+        // {<all fate ops>: numFateThreadsPool1}
+        // ->
+        // {<all fate ops except split>: numFateThreadsPool2,
+        // <split operation>: numFateThreadsPool3}
+        changeFateConfig(client, FateInstanceType.USER);
+        changeFateConfig(client, FateInstanceType.META);
+
+        // Allow FATE config changes to be picked up. Will take at most 
POOL_WATCHER_DELAY to
+        // commence, provide a buffer to allow to complete.
+        Thread.sleep(Fate.POOL_WATCHER_DELAY.toMillis() + 5_000);
+        // sink metrics, expect from this point onward that we will no longer 
see metrics for the
+        // old pool (pool1), only metrics for new pools (pool2 and pool3)
+        sink.getLines();
+
+        // let metrics build back up
+        Thread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis() * 
3);
+
+        boolean sawAnyMetricPool1 = false;
+
+        boolean sawExpectedTotalThreadsUserMetricPool2 = false;
+        boolean sawExpectedTotalThreadsMetaMetricPool2 = false;
+        boolean sawExpectedInactiveUserThreadsPool2 = false;
+        boolean sawExpectedInactiveMetaThreadsPool2 = false;
+
+        boolean sawExpectedTotalThreadsUserMetricPool3 = false;
+        boolean sawExpectedTotalThreadsMetaMetricPool3 = false;
+        boolean sawExpectedInactiveUserThreadsPool3 = false;
+        boolean sawExpectedInactiveMetaThreadsPool3 = false;
+        for (var line : sink.getLines()) {
+          TestStatsDSink.Metric metric = 
TestStatsDSink.parseStatsDMetric(line);
+          // if the metric is not one of the fate executor metrics...
+          if (!metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())
+              && !metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+            continue;
+          }
+          var tags = metric.getTags();
+          var instanceType = FateInstanceType
+              
.valueOf(tags.get(FateExecutorMetrics.INSTANCE_TYPE_TAG_KEY).toUpperCase());
+          var opsAssigned = tags.get(FateExecutorMetrics.OPS_ASSIGNED_TAG_KEY);
+
+          verifyFateMetricTags(opsAssigned, instanceType);
+
+          Set<Fate.FateOperation> fateOpsFromMetric = 
gatherFateOpsFromTag(opsAssigned);
+
+          if (fateOpsFromMetric.equals(Fate.FateOperation.getAllUserFateOps())
+              || 
fateOpsFromMetric.equals(Fate.FateOperation.getAllMetaFateOps())) {
+            sawAnyMetricPool1 = true;
+          } else if 
(fateOpsFromMetric.equals(Arrays.stream(Fate.FateOperation.values())
+              .filter(fo -> 
!fo.equals(SlowFateSplitManager.SLOW_OP)).collect(Collectors.toSet()))
+              && numFateThreadsPool2 == Integer.parseInt(metric.getValue())) {
+            // pool2
+            // total = inactive = size pool2
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedInactiveUserThreadsPool2 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedInactiveMetaThreadsPool2 = true;
+              }
+            } else if 
(metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedTotalThreadsUserMetricPool2 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedTotalThreadsMetaMetricPool2 = true;
+              }
+            }
+          } else if 
(fateOpsFromMetric.equals(Set.of(Fate.FateOperation.TABLE_SPLIT))
+              && numFateThreadsPool3 == Integer.parseInt(metric.getValue())) {
+            // pool3
+            // total = inactive = size pool3
+            if (metric.getName().equals(FATE_OPS_THREADS_INACTIVE.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedInactiveUserThreadsPool3 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedInactiveMetaThreadsPool3 = true;
+              }
+            } else if 
(metric.getName().equals(FATE_OPS_THREADS_TOTAL.getName())) {
+              if (instanceType == FateInstanceType.USER) {
+                sawExpectedTotalThreadsUserMetricPool3 = true;
+              } else if (instanceType == FateInstanceType.META) {
+                sawExpectedTotalThreadsMetaMetricPool3 = true;
+              }
+            }
+          }

Review Comment:
   I think in this specific case (if you're talking about line 410), test 
should automatically fail. I can add this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to