keith-turner commented on code in PR #3551:
URL: https://github.com/apache/accumulo/pull/3551#discussion_r1245908602
##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java:
##########
@@ -136,18 +143,35 @@ public synchronized boolean add(TabletMetadata
tabletMetadata, Collection<Compac
return true;
}
+ public long getSize() {
+ return maxSize;
+ }
+
+ public long getRejectedJobs() {
+ return rejectedJobs.get();
+ }
+
+ public long getDequeuedJobs() {
+ return dequeuedJobs.get();
+ }
+
+ public long getQueuedJobs() {
+ return queuedJobs.get();
+ }
Review Comment:
For this could we ask the queue for its size? Then don't need to worry about
keeping the atomic long up to date.
```suggestion
public synchronized long getQueuedJobs() {
return jobQueue.size();
}
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java:
##########
@@ -136,18 +143,35 @@ public synchronized boolean add(TabletMetadata
tabletMetadata, Collection<Compac
return true;
}
+ public long getSize() {
Review Comment:
```suggestion
public long getMaxSize() {
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.metrics;
+
+import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
+import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tags;
+
+public class QueueMetrics implements MetricsProducer {
+ private static final long DEFAULT_MIN_REFRESH_DELAY =
TimeUnit.SECONDS.toMillis(5);
+ private MeterRegistry meterRegistry = null;
+ private CompactionJobQueues compactionJobQueues;
+ private AtomicLong queueCount;
+
+ public QueueMetrics(CompactionJobQueues compactionJobQueues) {
+ this.compactionJobQueues = compactionJobQueues;
+ ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
+ .createScheduledExecutorService(1, "queueMetricsPoller", false);
+ Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
+
ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update,
+ DEFAULT_MIN_REFRESH_DELAY, DEFAULT_MIN_REFRESH_DELAY,
TimeUnit.MILLISECONDS));
+ }
+
+ public void update() {
+ if (meterRegistry != null) {
+ queueCount = meterRegistry.gauge(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES,
new AtomicLong(0));
+ }
+ if (queueCount != null) {
+ queueCount.set(compactionJobQueues.getQueueCount());
+ }
+
+ for (Map.Entry<CompactionExecutorId,CompactionJobPriorityQueue> entry :
compactionJobQueues
+ .getQueues()) {
+ String queueId = formatString(entry.getKey().toString(), true);
+ Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, entry, e ->
e.getValue().getSize())
Review Comment:
When a queue becomes empty its deleted and may be recreated later. This
code seems to bind to the internal queue objects that could stop being used.
Could instead never access the internal queue objects and only interact with
the top level object using ids. The following is an example of what I was
thinking.
```suggestion
for (CompactionExecutorId ceid : compactionJobQueues
.getQueues()) {
String queueId = formatString(entry.getKey().toString(), true);
Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, ceid, id ->
compactionJobQueues.getSize(id))
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.metrics;
+
+import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
+import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tags;
+
+public class QueueMetrics implements MetricsProducer {
+ private static final long DEFAULT_MIN_REFRESH_DELAY =
TimeUnit.SECONDS.toMillis(5);
+ private MeterRegistry meterRegistry = null;
+ private CompactionJobQueues compactionJobQueues;
+ private AtomicLong queueCount;
+
+ public QueueMetrics(CompactionJobQueues compactionJobQueues) {
+ this.compactionJobQueues = compactionJobQueues;
+ ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
+ .createScheduledExecutorService(1, "queueMetricsPoller", false);
+ Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
+
ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update,
+ DEFAULT_MIN_REFRESH_DELAY, DEFAULT_MIN_REFRESH_DELAY,
TimeUnit.MILLISECONDS));
+ }
+
+ public void update() {
+ if (meterRegistry != null) {
+ queueCount = meterRegistry.gauge(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES,
new AtomicLong(0));
+ }
+ if (queueCount != null) {
+ queueCount.set(compactionJobQueues.getQueueCount());
+ }
+
+ for (Map.Entry<CompactionExecutorId,CompactionJobPriorityQueue> entry :
compactionJobQueues
+ .getQueues()) {
+ String queueId = formatString(entry.getKey().toString(), true);
+ Gauge.builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, entry, e ->
e.getValue().getSize())
+ .description("Length of priority queues")
+ .tags(Tags.concat(getCommonTags(), "queue_id",
queueId)).register(meterRegistry);
+
+ Gauge
+ .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, entry,
+ e -> e.getValue().getQueuedJobs())
+ .description("Count of current queued jobs")
+ .tags(Tags.concat(getCommonTags(), "queue_id",
queueId)).register(meterRegistry);
+
+ Gauge
+ .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, entry,
+ e -> e.getValue().getDequeuedJobs())
+ .description("Count of jobs dequeued")
+ .tags(Tags.concat(getCommonTags(), "queue_id",
queueId)).register(meterRegistry);
+
+ Gauge
+ .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, entry,
+ e -> e.getValue().getRejectedJobs())
+ .description("Count of current rejected jobs")
Review Comment:
The dequeued description does not have the word `current`, should it be
removed here? Thinking they are both similar in that they continually increase.
```suggestion
.description("Count of rejected jobs")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]