kevinrr888 commented on code in PR #4466:
URL: https://github.com/apache/accumulo/pull/4466#discussion_r1584871796
##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
shutdownTailer.set(true);
thread.join();
}
+
+ /**
+ * Test that the compaction queue is cleared when compactions no longer need
to happen.
+ */
+ @Test
+ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+ // Metrics collector Thread setup
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+ Thread thread = Threads.createThread("metric-tailer", () -> {
+ while (true) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (shutdownTailer.get()) {
+ return;
+ }
+ if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX +
"queue")) {
+ queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+ }
+ }
+ }
+ });
+ thread.start();
+
+ ServerContext context = getCluster().getServerContext();
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ // Create splits so there are two groupings of tablets with similar file
counts.
+ String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+ addSplits(c, tableName, splitString);
+
+ for (int i = 0; i < 100; i++) {
+ writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+ }
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+
+ try (TabletsMetadata tm =
context.getAmple().readTablets().forTable(tableId).build()) {
+ // Get each tablet's file sizes
+ for (TabletMetadata tablet : tm) {
+ long fileSize = tablet.getFiles().size();
+ log.debug("Number of files in tablet {}: {}",
tablet.getExtent().toString(), fileSize);
+ }
+ }
+ verifyData(c, tableName, 0, 100 * 100 - 1, false);
+ }
+
+ final int sleepMillis = 3500; // Current poll rate of the
TestStatsDRegistryFactory is 3 seconds
+
+ Wait.waitFor(() -> {
+ while (!queueMetrics.isEmpty()) {
+ var qm = queueMetrics.take();
+ if
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+ && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ if (Integer.parseInt(qm.getValue()) > 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }, 60_000, sleepMillis, "did not see Q1 metrics");
+
+ Wait.waitFor(() -> {
+ int queueSize = 0;
+ boolean sawQueues = false;
+ while (!queueMetrics.isEmpty()) {
+ var metric = queueMetrics.take();
+ if
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ queueSize = Integer.parseInt(metric.getValue());
+ } else if (metric.getName()
+ .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) {
+ sawQueues = true;
+ } else {
+ log.debug("{}", metric);
Review Comment:
It would probably be good to include more context of the logging of this
metric.
##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
shutdownTailer.set(true);
thread.join();
}
+
+ /**
+ * Test that the compaction queue is cleared when compactions no longer need
to happen.
+ */
+ @Test
+ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+ // Metrics collector Thread setup
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+ Thread thread = Threads.createThread("metric-tailer", () -> {
+ while (true) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (shutdownTailer.get()) {
+ return;
+ }
+ if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX +
"queue")) {
+ queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+ }
+ }
+ }
+ });
+ thread.start();
+
+ ServerContext context = getCluster().getServerContext();
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ // Create splits so there are two groupings of tablets with similar file
counts.
+ String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+ addSplits(c, tableName, splitString);
+
+ for (int i = 0; i < 100; i++) {
+ writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+ }
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+
+ try (TabletsMetadata tm =
context.getAmple().readTablets().forTable(tableId).build()) {
+ // Get each tablet's file sizes
+ for (TabletMetadata tablet : tm) {
+ long fileSize = tablet.getFiles().size();
+ log.debug("Number of files in tablet {}: {}",
tablet.getExtent().toString(), fileSize);
+ }
+ }
+ verifyData(c, tableName, 0, 100 * 100 - 1, false);
+ }
+
+ final int sleepMillis = 3500; // Current poll rate of the
TestStatsDRegistryFactory is 3 seconds
Review Comment:
Might be safer to factor in the polling rate from TestStatsDRegistryFactory
here in case it changes
##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
shutdownTailer.set(true);
thread.join();
}
+
+ /**
+ * Test that the compaction queue is cleared when compactions no longer need
to happen.
+ */
+ @Test
+ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+ // Metrics collector Thread setup
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+ Thread thread = Threads.createThread("metric-tailer", () -> {
+ while (true) {
Review Comment:
Doesn't need to be addressed but I think `while (true)` loops should be
avoided if possible. Could maybe loop while `shutdownTailer` is false
##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +410,125 @@ public void testQueueMetrics() throws Exception {
shutdownTailer.set(true);
thread.join();
}
+
+ /**
+ * Test that the compaction queue is cleared when compactions no longer need
to happen.
+ */
+ @Test
+ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+
+ // Metrics collector Thread setup
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+ Thread thread = Threads.createThread("metric-tailer", () -> {
+ while (true) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (shutdownTailer.get()) {
+ return;
+ }
+ if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX +
"queue")) {
+ queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+ }
+ }
+ }
+ });
+ thread.start();
+
+ ServerContext context = getCluster().getServerContext();
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ // Create splits so there are two groupings of tablets with similar file
counts.
+ String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+ addSplits(c, tableName, splitString);
+
+ for (int i = 0; i < 100; i++) {
+ writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+ }
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+
+ try (TabletsMetadata tm =
context.getAmple().readTablets().forTable(tableId).build()) {
+ // Get each tablet's file sizes
+ for (TabletMetadata tablet : tm) {
+ long fileSize = tablet.getFiles().size();
+ log.debug("Number of files in tablet {}: {}",
tablet.getExtent().toString(), fileSize);
+ }
+ }
+ verifyData(c, tableName, 0, 100 * 100 - 1, false);
+ }
+
+ final int sleepMillis = 3500; // Current poll rate of the
TestStatsDRegistryFactory is 3 seconds
+
+ Wait.waitFor(() -> {
+ while (!queueMetrics.isEmpty()) {
+ var qm = queueMetrics.take();
+ if
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+ && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ if (Integer.parseInt(qm.getValue()) > 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }, 60_000, sleepMillis, "did not see Q1 metrics");
+
+ Wait.waitFor(() -> {
+ int queueSize = 0;
+ boolean sawQueues = false;
+ while (!queueMetrics.isEmpty()) {
+ var metric = queueMetrics.take();
+ if
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ queueSize = Integer.parseInt(metric.getValue());
+ } else if (metric.getName()
+ .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) {
+ sawQueues = true;
+ } else {
+ log.debug("{}", metric);
+ }
+ }
+ return queueSize == QUEUE1_SIZE && sawQueues;
+ }, 60_000, sleepMillis, "did not see the expected number of queued
compactions");
+
+ // change compactor settings so that compactions no longer need to run
+ context.tableOperations().setProperty(tableName,
Property.TABLE_MAJC_RATIO.getKey(), "2000");
+ context.tableOperations().setProperty(tableName,
Property.TABLE_FILE_MAX.getKey(), "2000");
+
+ // wait for queue to clear
+ Wait.waitFor(() -> {
+ int jobsQueued = QUEUE1_SIZE;
+ int queueLength = 0;
+ long dequeued = 0;
+ long rejected = 0;
+ while (!queueMetrics.isEmpty()) {
+ var metric = queueMetrics.take();
+ if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ jobsQueued = Integer.parseInt(metric.getValue());
+ } else if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ queueLength = Integer.parseInt(metric.getValue());
+ } else if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ rejected = Long.parseLong(metric.getValue());
+ } else if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ dequeued = Long.parseLong(metric.getValue());
+ }
+ }
+ log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs
Rejected: {}", queueLength,
+ jobsQueued, dequeued, rejected);
+ return jobsQueued == 0;
+ }, 60_000, sleepMillis,
+ "expected job queue to be cleared once compactions no longer need to
happen");
Review Comment:
Could wait for thread completion here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]