DomGarguilo commented on code in PR #4466:
URL: https://github.com/apache/accumulo/pull/4466#discussion_r1571121931
##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
shutdownTailer.set(true);
thread.join();
}
+
+ @Test
+ public void newTest() throws Exception {
+ // Metrics collector Thread
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+
+ Thread thread = Threads.createThread("metric-tailer", () -> {
+ while (!shutdownTailer.get()) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (shutdownTailer.get()) {
+ break;
+ }
+ if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX +
"queue")) {
+ queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+ }
+ }
+ }
+ });
+ thread.start();
+
+ long highestFileCount = 0L;
+ ServerContext context = getCluster().getServerContext();
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ // Create splits so there are two groupings of tablets with similar file
counts.
+ String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+ addSplits(c, tableName, splitString);
+
+ for (int i = 0; i < 100; i++) {
+ writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+ }
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+
+ IteratorSetting iterSetting = new IteratorSetting(100,
CompactionIT.TestFilter.class);
+ iterSetting.addOption("expectedQ", QUEUE1);
+ iterSetting.addOption("modulus", 3 + "");
+ CompactionConfig config =
+ new
CompactionConfig().setIterators(List.of(iterSetting)).setWait(false);
+ c.tableOperations().compact(tableName, config);
+
+ try (TabletsMetadata tm =
context.getAmple().readTablets().forTable(tableId).build()) {
+ // Get each tablet's file sizes
+ for (TabletMetadata tablet : tm) {
+ long fileSize = tablet.getFiles().size();
+ log.info("Number of files in tablet {}: {}",
tablet.getExtent().toString(), fileSize);
+ highestFileCount = Math.max(highestFileCount, fileSize);
+ }
+ }
+ verifyData(c, tableName, 0, 100 * 100 - 1, false);
+ }
+
+ boolean sawMetricsQ1 = false;
+ while (!sawMetricsQ1) {
+ while (!queueMetrics.isEmpty()) {
+ var qm = queueMetrics.take();
+ if
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+ && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ if (Integer.parseInt(qm.getValue()) > 0) {
+ sawMetricsQ1 = true;
+ }
+ }
+ }
+ // Current poll rate of the TestStatsDRegistryFactory is 3 seconds
+ // If metrics are not found in the queue, sleep until the next poll.
+ UtilWaitThread.sleep(3500);
+ }
+
+ // Set lowest priority to the lowest possible system compaction priority
+ long lowestPriority = Short.MIN_VALUE;
+ long rejectedCount = 0L;
+ int queueSize = 0;
+
+ boolean sawQueues = false;
+ // An empty queue means that the last known value is the most recent.
+ while (!queueMetrics.isEmpty()) {
+ var metric = queueMetrics.take();
+ if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ rejectedCount = Long.parseLong(metric.getValue());
+ } else if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ lowestPriority = Math.max(lowestPriority,
Long.parseLong(metric.getValue()));
+ } else if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ queueSize = Integer.parseInt(metric.getValue());
+ } else if
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES))
{
+ sawQueues = true;
+ } else {
+ log.debug("{}", metric);
+ }
+ }
+
+ // Confirm metrics were generated and in some cases, validate contents.
+ assertTrue(rejectedCount > 0L);
+
+ // Priority is the file counts + number of compactions for that tablet.
+ // The lowestPriority job in the queue should have been
+ // at least 1 count higher than the highest file count.
+ short highestFileCountPrio = CompactionJobPrioritizer.createPriority(
+ getCluster().getServerContext().getTableId(tableName),
CompactionKind.USER,
+ (int) highestFileCount, 0);
+ assertTrue(lowestPriority > highestFileCountPrio,
+ lowestPriority + " " + highestFileCount + " " + highestFileCountPrio);
+
+ // Multiple Queues have been created
+ assertTrue(sawQueues);
+
+ // Queue size matches the intended queue size
+ assertEquals(QUEUE1_SIZE, queueSize);
Review Comment:
Addressed in 240c310
##########
test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java:
##########
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
shutdownTailer.set(true);
thread.join();
}
+
+ @Test
+ public void newTest() throws Exception {
+ // Metrics collector Thread
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
Review Comment:
Addressed in bd4d08e
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]