keith-turner commented on code in PR #5726: URL: https://github.com/apache/accumulo/pull/5726#discussion_r2208018904
########## core/src/main/thrift/compaction-coordinator.thrift: ########## @@ -112,6 +112,7 @@ service CompactionCoordinatorService { 2:security.TCredentials credentials 3:string externalCompactionId 4:data.TKeyExtent extent + 5:string exceptionClassName Review Comment: this may be a breaking change to thrift RPC? or maybe it will be null/ignored when there is difference so maybe its ok. Also coordinator is experimental. ########## core/src/main/java/org/apache/accumulo/core/conf/Property.java: ########## @@ -1556,6 +1556,34 @@ public enum Property { COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT, "The port used for handling client connections on the compactor servers.", "2.1.0"), @Experimental + COMPACTOR_FAILURE_BACKOFF_THRESHOLD("compactor.failure.backoff.threshold", "3", + PropertyType.COUNT, + "The number of consecutive failures that must occur before the Compactor starts to back off" + + " processing compactions.", + "2.1.4"), + @Experimental + COMPACTOR_FAILURE_BACKOFF_INTERVAL("compactor.failure.backoff.interval", "0", + PropertyType.TIMEDURATION, + "The time basis for computing the wait time for compaction failure backoff. A value of zero disables" + + " the backoff feature. When a non-zero value is supplied, then after compactor.failure.backoff.threshold" + + " failures have occurred, the compactor will wait compactor.failure.backoff.interval * the number of" + + " failures seconds before executing the next compaction. For example, if this value is 10s, then after" + + " three failures the Compactor will wait 30s before starting the next compaction. If the compaction fails" + + " again, then it will wait 40s before starting the next compaction.", + "2.1.4"), + @Experimental + COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m", Review Comment: Not recommending any changes here, was just pondering something. Another way this could work is that it could set a max backoff time instead of a reset time. Once we get to that max time we stop incrementing, but do not reset until a success is seen. Not coming up w/ any advantages for this other approach though. Wondering if there is any particular reason this reset after time approach was chosen? ########## server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java: ########## @@ -611,18 +620,89 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent) throws ThriftSecurityException { + TKeyExtent extent, String exceptionClassName) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); - LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId, fromThriftExtent); + LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}", externalCompactionId, + fromThriftExtent, exceptionClassName); final var ecid = ExternalCompactionId.of(externalCompactionId); + if (exceptionClassName != null) { + captureFailure(ecid, fromThriftExtent); + } compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); } + private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) { + var rc = RUNNING_CACHE.get(ecid); + if (rc != null) { + final String queue = rc.getQueueName(); + failingQueues.computeIfAbsent(queue, q -> new AtomicLong(0)).incrementAndGet(); + final String compactor = rc.getCompactorAddress(); + failingCompactors.computeIfAbsent(compactor, c -> new AtomicLong(0)).incrementAndGet(); + } + failingTables.computeIfAbsent(extent.tableId(), t -> new AtomicLong(0)).incrementAndGet(); + } + + protected void startFailureSummaryLogging() { + ScheduledFuture<?> future = getContext().getScheduledExecutor() + .scheduleWithFixedDelay(this::printFailures, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void printFailures() { + + // Remove down compactors from failing list + Map<String,List<HostAndPort>> allCompactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + Set<String> allCompactorAddrs = new HashSet<>(); + allCompactors.values().forEach(l -> l.forEach(c -> allCompactorAddrs.add(c.toString()))); + failingCompactors.keySet().retainAll(allCompactorAddrs); + Review Comment: Another way this tracking could work is that it counts success and failure. Each time it logs it takes a snapshot of the counts, logs them, and then deducts the snapshot it logged. This will give an indication of the successes and failures since the last time this functions ran. Also could only log when there is failure and maybe log a line per thing (interpreting large maps in the log can be difficult). Maybe something like the following. ```java // this is not really correct, its assuming we also get a snapshot of the value in the map.. but that is not really true Map<String, SuccessFailureCounts> queueSnapshot = Map.copyOf(failingQueues); queueSnapshot.foreach((queue, counts)-> { if(counts.failures > 0){ LOG.warn("Queue {} had {} successes and {} failures in the last {}ms", queue, counts.successes, counts.failures, logInterval); // TODO decrement counts logged from failingQueues, by decrementing only the // counts logged we do not lose any concurrent increments made while logging } }); ``` ########## core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java: ########## @@ -66,5 +68,5 @@ default void init(ContextClassLoaderEnvironment env) {} * consulting this plugin. * @return the class loader for the given contextName */ - ClassLoader getClassLoader(String contextName); + ClassLoader getClassLoader(String contextName) throws IOException, ReflectiveOperationException; Review Comment: Is there a benefit to these two specific exceptions? If we want information to travel through the code via a checked exception, then it may be better to create a very specific exception related to this SPI. This allows knowing that class loader creation failed w/o trying to guess at specific reasons/exceptions that it could fail, the specific reason should be in the cause. In general we may want to know this type of failure happened, but we probably do not care too much why it happened. Whenever it happens for any reasons its not good. ```java // maybe this should extend Exception /** * @since 2.1.4 * / public static class ClassLoaderCreationFailed extends AccumuloException { public ClassLoaderCreationFailed(Throwable cause) {} public ClassLoaderCreationFailed(String msg, Throwable cause) {} } ClassLoader getClassLoader(String contextName) throws ClassLoaderCreationFailed; ``` We could also leave this SPI as is and create a new internal exception that is always thrown when class loading creation fails. This allows this very specific and important information to travel in the internal code. Could do the more minimal change below in 2.1 and add the checked exception to the SPI in 4.0. Not opposed to adding a checked exception in 2.1.4 to the SPI though, would need to document the breaking change in the release notes. ```java public class ClassLoaderUtil { // create this class outside of public API... any code in the class that attempts to create a classloader and fails should throw this exception // this could be a checked or runtime exception... not sure which is best public static class ClassLoaderCreationFailed extends RuntimeException { } public static ClassLoader getClassLoader(String context) { try{ return FACTORY.getClassLoader(context); } catch (Exception e) { throw new ClassLoaderCreationFailed(e); } } } ``` ########## server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java: ########## @@ -712,6 +717,44 @@ public void run() { err.set(null); JOB_HOLDER.reset(); + // consecutive failure processing + final long totalFailures = + errorHistory.values().stream().mapToLong(p -> p.getSecond().get()).sum(); + if (totalFailures > 0) { + LOG.warn("This Compactor has had {} consecutive failures. Failures: {}", totalFailures, + errorHistory); Review Comment: Wondering how this will look, its always relogging the entire error history. Also it seems like it will only log the first exception seen for a table, is that the intent? I will know more about how this looks when I run some test. ########## server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java: ########## @@ -712,6 +717,44 @@ public void run() { err.set(null); JOB_HOLDER.reset(); + // consecutive failure processing + final long totalFailures = + errorHistory.values().stream().mapToLong(p -> p.getSecond().get()).sum(); + if (totalFailures > 0) { Review Comment: Could all of this be moved into a function? This body of this while loop is already long and its already difficult to work out its higher level structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org