keith-turner commented on code in PR #4767:
URL: https://github.com/apache/accumulo/pull/4767#discussion_r1693574729


##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -445,9 +445,14 @@ public enum Property {
   MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", 
"8", PropertyType.COUNT,
       "The number of threads used to inspect tablets files to find split 
points.", "4.0.0"),
 
-  
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
-      "10000", PropertyType.COUNT,
-      "The max size of each resource groups compaction job priority queue.", 
"4.0"),
+  MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
+      "manager.compaction.major.service.queue.initial.size", "10000", 
PropertyType.COUNT,
+      "The initial size of each resource groups compaction job priority 
queue.", "4.0.0"),
+  MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
+      "manager.compaction.major.service.queue.size.factor", "1.5", 
PropertyType.FRACTION,

Review Comment:
   A higher default give more opportunity for the TGW to buffer jobs in the 
case where the jobs times are short.
   
   ```suggestion
         "manager.compaction.major.service.queue.size.factor", "3", 
PropertyType.FRACTION,
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java:
##########
@@ -304,4 +309,8 @@ private CjpqKey addJobToQueue(TabletMetadata 
tabletMetadata, CompactionJob job)
     jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata));
     return key;
   }
+
+  public synchronized void clear() {
+    jobQueue.clear();

Review Comment:
   tabletJobs needs to be cleared also, its like an index for jobQueue that 
helps quickly find what a tablet has queued
   
   ```suggestion
       jobQueue.clear();
       tabletJobs.clear();
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1070,21 +1071,30 @@ private void cleanUpCompactors() {
       var groups = zoorw.getChildren(compactorQueuesPath);
 
       for (String group : groups) {
-        String qpath = compactorQueuesPath + "/" + group;
-
-        var compactors = zoorw.getChildren(qpath);
+        final String qpath = compactorQueuesPath + "/" + group;
+        final CompactorGroupId cgid = CompactorGroupId.of(group);
+        final var compactors = zoorw.getChildren(qpath);
 
         if (compactors.isEmpty()) {
           deleteEmpty(zoorw, qpath);
-        }
-
-        for (String compactor : compactors) {
-          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
-          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group 
+ "/" + compactor);
-          if (lockNodes.isEmpty()) {
-            deleteEmpty(zoorw, cpath);
+          // Group has no compactors, we can clear its
+          // associated priority queue of jobs
+          getJobQueues().getQueue(cgid).clear();
+        } else {
+          int aliveCompactorsForGroup = 0;
+          for (String compactor : compactors) {
+            String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
+            var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + 
group + "/" + compactor);
+            if (lockNodes.isEmpty()) {
+              deleteEmpty(zoorw, cpath);
+            } else {
+              aliveCompactorsForGroup++;
+            }
           }
+          getJobQueues().getQueue(cgid).setMaxSize(
+              Math.min((int) (aliveCompactorsForGroup * queueSizeFactor), 
Integer.MAX_VALUE));

Review Comment:
   Its possible that a queue may not exist if there are no jobs.  Could add the 
following method to CompactionJobQueues.  Suggested this because there is a 
getQueueMaxSize() method.
   
   ```java
     public void setQueueMaxSize(CompactorGroupId groupId, int size) {
       var prioQ = priorityQueues.get(groupId);
       if(!prioQ != null){ prioQ.setMaxSize(size)};
     }
   ```
   
   Then call it here.  
   
   ```suggestion
             getJobQueues().setQueueMaxSize(cgid, Math.min((int) 
(aliveCompactorsForGroup * queueSizeFactor), Integer.MAX_VALUE));
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1070,21 +1071,30 @@ private void cleanUpCompactors() {
       var groups = zoorw.getChildren(compactorQueuesPath);
 
       for (String group : groups) {
-        String qpath = compactorQueuesPath + "/" + group;
-
-        var compactors = zoorw.getChildren(qpath);
+        final String qpath = compactorQueuesPath + "/" + group;
+        final CompactorGroupId cgid = CompactorGroupId.of(group);
+        final var compactors = zoorw.getChildren(qpath);
 
         if (compactors.isEmpty()) {
           deleteEmpty(zoorw, qpath);
-        }
-
-        for (String compactor : compactors) {
-          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
-          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group 
+ "/" + compactor);
-          if (lockNodes.isEmpty()) {
-            deleteEmpty(zoorw, cpath);
+          // Group has no compactors, we can clear its
+          // associated priority queue of jobs
+          getJobQueues().getQueue(cgid).clear();

Review Comment:
   Its possible that a queue may not exists, so this could throw an NPE.



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java:
##########
@@ -201,8 +202,12 @@ public synchronized int add(TabletMetadata tabletMetadata, 
Collection<Compaction
     return jobsAdded;
   }
 
-  public long getMaxSize() {
-    return maxSize;
+  public int getMaxSize() {
+    return maxSize.get();
+  }
+
+  public void setMaxSize(int maxSize) {

Review Comment:
   Could make these synchronized for memory consistency.  Also everythnig else 
is syncronized so making the set method synchronized will ensure the max size 
does not change for another thread in the middle of a method which makes things 
easier to reason about.
   
   ```suggestion
     public synchronized int getMaxSize() {
       return maxSize.get();
     }
   
     public synchronized void setMaxSize(int maxSize) {
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java:
##########
@@ -107,7 +108,7 @@ public boolean equals(Object o) {
   // efficiently removing entries from anywhere in the queue. Efficient 
removal is needed for the
   // case where tablets decided to issues different compaction jobs than what 
is currently queued.
   private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
-  private final int maxSize;
+  private Supplier<Integer> maxSize;

Review Comment:
   The way this is used seems like it could be an AtomicInteger (or maybe 
interger if only accessed in sync method) instead of supplier.  The supplier 
passed to constructor seems to contain a fixed value or final constant and the 
set method replaces the supplier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to