kevinrr888 commented on code in PR #4466:
URL: https://github.com/apache/accumulo/pull/4466#discussion_r1584871796


##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  /**
+   * Test that the compaction queue is cleared when compactions no longer need 
to happen.
+   */
+  @Test
+  public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+    // Metrics collector Thread setup
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (true) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            return;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      try (TabletsMetadata tm = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        // Get each tablet's file sizes
+        for (TabletMetadata tablet : tm) {
+          long fileSize = tablet.getFiles().size();
+          log.debug("Number of files in tablet {}: {}", 
tablet.getExtent().toString(), fileSize);
+        }
+      }
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
+    }
+
+    final int sleepMillis = 3500; // Current poll rate of the 
TestStatsDRegistryFactory is 3 seconds
+
+    Wait.waitFor(() -> {
+      while (!queueMetrics.isEmpty()) {
+        var qm = queueMetrics.take();
+        if 
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(qm.getValue()) > 0) {
+            return true;
+          }
+        }
+      }
+      return false;
+    }, 60_000, sleepMillis, "did not see Q1 metrics");
+
+    Wait.waitFor(() -> {
+      int queueSize = 0;
+      boolean sawQueues = false;
+      while (!queueMetrics.isEmpty()) {
+        var metric = queueMetrics.take();
+        if 
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          queueSize = Integer.parseInt(metric.getValue());
+        } else if (metric.getName()
+            .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) {
+          sawQueues = true;
+        } else {
+          log.debug("{}", metric);

Review Comment:
   It would probably be good to include more context of the logging of this 
metric.



##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  /**
+   * Test that the compaction queue is cleared when compactions no longer need 
to happen.
+   */
+  @Test
+  public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+    // Metrics collector Thread setup
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (true) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            return;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      try (TabletsMetadata tm = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        // Get each tablet's file sizes
+        for (TabletMetadata tablet : tm) {
+          long fileSize = tablet.getFiles().size();
+          log.debug("Number of files in tablet {}: {}", 
tablet.getExtent().toString(), fileSize);
+        }
+      }
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
+    }
+
+    final int sleepMillis = 3500; // Current poll rate of the 
TestStatsDRegistryFactory is 3 seconds

Review Comment:
   Might be safer to factor in the polling rate from TestStatsDRegistryFactory 
here in case it changes



##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  /**
+   * Test that the compaction queue is cleared when compactions no longer need 
to happen.
+   */
+  @Test
+  public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+    // Metrics collector Thread setup
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (true) {

Review Comment:
   Doesn't need to be addressed but I think `while (true)` loops should be 
avoided if possible. Could maybe loop while `shutdownTailer` is false



##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
     shutdownTailer.set(true);
     thread.join();
   }
+
+  /**
+   * Test that the compaction queue is cleared when compactions no longer need 
to happen.
+   */
+  @Test
+  public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+    // Metrics collector Thread setup
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (true) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            return;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      try (TabletsMetadata tm = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        // Get each tablet's file sizes
+        for (TabletMetadata tablet : tm) {
+          long fileSize = tablet.getFiles().size();
+          log.debug("Number of files in tablet {}: {}", 
tablet.getExtent().toString(), fileSize);
+        }
+      }
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
+    }
+
+    final int sleepMillis = 3500; // Current poll rate of the 
TestStatsDRegistryFactory is 3 seconds
+
+    Wait.waitFor(() -> {
+      while (!queueMetrics.isEmpty()) {
+        var qm = queueMetrics.take();
+        if 
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(qm.getValue()) > 0) {
+            return true;
+          }
+        }
+      }
+      return false;
+    }, 60_000, sleepMillis, "did not see Q1 metrics");
+
+    Wait.waitFor(() -> {
+      int queueSize = 0;
+      boolean sawQueues = false;
+      while (!queueMetrics.isEmpty()) {
+        var metric = queueMetrics.take();
+        if 
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          queueSize = Integer.parseInt(metric.getValue());
+        } else if (metric.getName()
+            .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) {
+          sawQueues = true;
+        } else {
+          log.debug("{}", metric);
+        }
+      }
+      return queueSize == QUEUE1_SIZE && sawQueues;
+    }, 60_000, sleepMillis, "did not see the expected number of queued 
compactions");
+
+    // change compactor settings so that compactions no longer need to run
+    context.tableOperations().setProperty(tableName, 
Property.TABLE_MAJC_RATIO.getKey(), "2000");
+    context.tableOperations().setProperty(tableName, 
Property.TABLE_FILE_MAX.getKey(), "2000");
+
+    // wait for queue to clear
+    Wait.waitFor(() -> {
+      int jobsQueued = QUEUE1_SIZE;
+      int queueLength = 0;
+      long dequeued = 0;
+      long rejected = 0;
+      while (!queueMetrics.isEmpty()) {
+        var metric = queueMetrics.take();
+        if (metric.getName()
+            
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          jobsQueued = Integer.parseInt(metric.getValue());
+        } else if (metric.getName()
+            
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          queueLength = Integer.parseInt(metric.getValue());
+        } else if (metric.getName()
+            
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          rejected = Long.parseLong(metric.getValue());
+        } else if (metric.getName()
+            
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          dequeued = Long.parseLong(metric.getValue());
+        }
+      }
+      log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs 
Rejected: {}", queueLength,
+          jobsQueued, dequeued, rejected);
+      return jobsQueued == 0;
+    }, 60_000, sleepMillis,
+        "expected job queue to be cleared once compactions no longer need to 
happen");

Review Comment:
   Could wait for thread completion here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to