keith-turner commented on code in PR #5726:
URL: https://github.com/apache/accumulo/pull/5726#discussion_r2208018904


##########
core/src/main/thrift/compaction-coordinator.thrift:
##########
@@ -112,6 +112,7 @@ service CompactionCoordinatorService {
     2:security.TCredentials credentials
     3:string externalCompactionId
     4:data.TKeyExtent extent
+    5:string exceptionClassName

Review Comment:
   this may be a breaking change to thrift RPC?  or maybe it will be 
null/ignored when there is difference so maybe its ok.  Also coordinator is 
experimental. 



##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -1556,6 +1556,34 @@ public enum Property {
   COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
       "The port used for handling client connections on the compactor 
servers.", "2.1.0"),
   @Experimental
+  COMPACTOR_FAILURE_BACKOFF_THRESHOLD("compactor.failure.backoff.threshold", 
"3",
+      PropertyType.COUNT,
+      "The number of consecutive failures that must occur before the Compactor 
starts to back off"
+          + " processing compactions.",
+      "2.1.4"),
+  @Experimental
+  COMPACTOR_FAILURE_BACKOFF_INTERVAL("compactor.failure.backoff.interval", "0",
+      PropertyType.TIMEDURATION,
+      "The time basis for computing the wait time for compaction failure 
backoff. A value of zero disables"
+          + " the backoff feature. When a non-zero value is supplied, then 
after compactor.failure.backoff.threshold"
+          + " failures have occurred, the compactor will wait 
compactor.failure.backoff.interval * the number of"
+          + " failures seconds before executing the next compaction. For 
example, if this value is 10s, then after"
+          + " three failures the Compactor will wait 30s before starting the 
next compaction. If the compaction fails"
+          + " again, then it will wait 40s before starting the next 
compaction.",
+      "2.1.4"),
+  @Experimental
+  COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m",

Review Comment:
   Not recommending any changes here, was just pondering something.  Another 
way this could work is that it could set a max backoff time instead of a reset 
time.  Once we get to that max time we stop incrementing, but do not reset 
until a success is seen.  Not coming up w/ any advantages for this other 
approach though.  Wondering if there is any particular reason this reset after 
time approach was chosen?



##########
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java:
##########
@@ -611,18 +620,89 @@ public void compactionCompleted(TInfo tinfo, TCredentials 
credentials,
 
   @Override
   public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
-      TKeyExtent extent) throws ThriftSecurityException {
+      TKeyExtent extent, String exceptionClassName) throws 
ThriftSecurityException {
     // do not expect users to call this directly, expect other tservers to 
call this method
     if (!security.canPerformSystemActions(credentials)) {
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
     KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
-    LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId, 
fromThriftExtent);
+    LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}", 
externalCompactionId,
+        fromThriftExtent, exceptionClassName);
     final var ecid = ExternalCompactionId.of(externalCompactionId);
+    if (exceptionClassName != null) {
+      captureFailure(ecid, fromThriftExtent);
+    }
     compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
   }
 
+  private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
+    var rc = RUNNING_CACHE.get(ecid);
+    if (rc != null) {
+      final String queue = rc.getQueueName();
+      failingQueues.computeIfAbsent(queue, q -> new 
AtomicLong(0)).incrementAndGet();
+      final String compactor = rc.getCompactorAddress();
+      failingCompactors.computeIfAbsent(compactor, c -> new 
AtomicLong(0)).incrementAndGet();
+    }
+    failingTables.computeIfAbsent(extent.tableId(), t -> new 
AtomicLong(0)).incrementAndGet();
+  }
+
+  protected void startFailureSummaryLogging() {
+    ScheduledFuture<?> future = getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(this::printFailures, 0, 5, TimeUnit.MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
+  }
+
+  private void printFailures() {
+
+    // Remove down compactors from failing list
+    Map<String,List<HostAndPort>> allCompactors =
+        ExternalCompactionUtil.getCompactorAddrs(getContext());
+    Set<String> allCompactorAddrs = new HashSet<>();
+    allCompactors.values().forEach(l -> l.forEach(c -> 
allCompactorAddrs.add(c.toString())));
+    failingCompactors.keySet().retainAll(allCompactorAddrs);
+

Review Comment:
   Another way this tracking could work is that it counts success and failure.  
Each time it logs it takes a snapshot of the counts, logs them, and then 
deducts the snapshot it logged.  This will give an indication of the successes 
and failures since the last time this functions ran.   Also could only log when 
there is failure and maybe log a line per thing (interpreting large maps in the 
log can be difficult).  Maybe something like the following.
   
   ```java
   // this is not really correct, its assuming we also get a snapshot of the 
value in the map.. but that is not really true
   Map<String, SuccessFailureCounts> queueSnapshot = Map.copyOf(failingQueues);
   
   queueSnapshot.foreach((queue, counts)-> {
       if(counts.failures > 0){
           LOG.warn("Queue {} had {} successes and {} failures in the last 
{}ms", queue, counts.successes, counts.failures, logInterval);
           // TODO decrement counts logged from failingQueues, by decrementing 
only the 
           // counts logged we do not lose any concurrent increments made while 
logging
       }
   });
   
   ```
   
   



##########
core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java:
##########
@@ -66,5 +68,5 @@ default void init(ContextClassLoaderEnvironment env) {}
    *        consulting this plugin.
    * @return the class loader for the given contextName
    */
-  ClassLoader getClassLoader(String contextName);
+  ClassLoader getClassLoader(String contextName) throws IOException, 
ReflectiveOperationException;

Review Comment:
   Is there a benefit to these two specific exceptions?  If we want information 
to travel through the code via a checked exception, then it may be better to 
create a very specific exception related to this SPI.  This allows knowing that 
class loader creation failed w/o trying to guess at specific reasons/exceptions 
that it could fail, the specific reason should be in the cause.  In general we 
may want to know this type of failure happened, but we probably do not care too 
much why it happened.  Whenever it happens for any reasons its not good.
   
   ```java
   
   // maybe this should extend Exception
   /**
    * @since 2.1.4
    * /
   public static class ClassLoaderCreationFailed extends AccumuloException {
        public ClassLoaderCreationFailed(Throwable cause) {}
        public ClassLoaderCreationFailed(String msg, Throwable cause) {}
   }
   
   ClassLoader getClassLoader(String contextName) throws 
ClassLoaderCreationFailed;
   ```
   
   
   We could also leave this SPI as is and create a new internal exception that 
is always thrown when class loading creation fails.  This allows this very 
specific and important information to travel in the internal code.   Could do 
the more minimal change below in 2.1 and add the checked exception to the SPI 
in 4.0.  Not opposed to adding a checked exception in 2.1.4 to the SPI though, 
would need to document the breaking change in the release notes.
   
   ```java
   public class ClassLoaderUtil {
     // create this class outside of public API... any code in the class that 
attempts to create a classloader and fails should throw this exception
     // this could be a checked or runtime exception... not sure which is best
     public static  class ClassLoaderCreationFailed extends RuntimeException {
        
     }
     
    public static ClassLoader getClassLoader(String context)  {
         try{
              return FACTORY.getClassLoader(context);
          } catch (Exception e) {
             throw new ClassLoaderCreationFailed(e);
         }
    }
   
     
   }
   
   ```



##########
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java:
##########
@@ -712,6 +717,44 @@ public void run() {
           err.set(null);
           JOB_HOLDER.reset();
 
+          // consecutive failure processing
+          final long totalFailures =
+              errorHistory.values().stream().mapToLong(p -> 
p.getSecond().get()).sum();
+          if (totalFailures > 0) {
+            LOG.warn("This Compactor has had {} consecutive failures. 
Failures: {}", totalFailures,
+                errorHistory);

Review Comment:
   Wondering how this will look, its always relogging the entire error history. 
 Also it seems like it will only log the first exception seen for a table, is 
that the intent?  I will know more about how this looks when I run some test.



##########
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java:
##########
@@ -712,6 +717,44 @@ public void run() {
           err.set(null);
           JOB_HOLDER.reset();
 
+          // consecutive failure processing
+          final long totalFailures =
+              errorHistory.values().stream().mapToLong(p -> 
p.getSecond().get()).sum();
+          if (totalFailures > 0) {

Review Comment:
   Could all of this be moved into a function?  This body of this while loop is 
already long and its already difficult to work out its higher level structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to