keith-turner commented on code in PR #5811:
URL: https://github.com/apache/accumulo/pull/5811#discussion_r2299034598


##########
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java:
##########
@@ -84,15 +84,6 @@ public static void startServer(AbstractServer server, Logger 
LOG) throws Excepti
       e.printStackTrace();
       LOG.error("{} died, exception thrown from runServer.", 
server.getClass().getSimpleName(), e);
       throw e;
-    } finally {

Review Comment:
   A bit futher up could change the catch to `catch(Exception e)` instead of 
`catch(Throwable e)`.  The only reason it was catching Throwable was in case 
the finally block threw an exception.



##########
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java:
##########
@@ -476,8 +470,22 @@ public void startServiceLockVerificationThread() {
 
   @Override
   public void close() {
-    if (context != null) {
-      context.close();
+
+    context.getLowMemoryDetector().logGCInfo(getConfiguration());
+
+    // Must set shutdown as completed before calling super.close().

Review Comment:
   This comment mentions calling `super.close()`, but do not see that.



##########
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java:
##########
@@ -829,237 +829,241 @@ public void run() {
         
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
 
     LOG.info("Compactor started, waiting for work");
-    try {
-
-      final AtomicReference<Throwable> err = new AtomicReference<>();
-      final LogSorter logSorter = new LogSorter(this);
-      long nextSortLogsCheckTime = System.currentTimeMillis();
 
-      while (!isShutdownRequested()) {
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.info("Server process thread has been interrupted, shutting 
down");
-          break;
-        }
-        try {
-          // mark compactor as idle while not in the compaction loop
-          updateIdleStatus(true);
+    final AtomicReference<Throwable> err = new AtomicReference<>();
+    final LogSorter logSorter = new LogSorter(this);
+    long nextSortLogsCheckTime = System.currentTimeMillis();
 
-          currentCompactionId.set(null);
-          err.set(null);
-          JOB_HOLDER.reset();
-
-          if (System.currentTimeMillis() > nextSortLogsCheckTime) {
-            // Attempt to process all existing log sorting work serially in 
this thread.
-            // When no work remains, this call will return so that we can look 
for compaction
-            // work.
-            LOG.debug("Checking to see if any recovery logs need sorting");
+    while (!isShutdownRequested()) {
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Server process thread has been interrupted, shutting down");
+        break;
+      }
+      try {
+        // mark compactor as idle while not in the compaction loop
+        updateIdleStatus(true);
+
+        currentCompactionId.set(null);
+        err.set(null);
+        JOB_HOLDER.reset();
+
+        if (System.currentTimeMillis() > nextSortLogsCheckTime) {
+          // Attempt to process all existing log sorting work serially in this 
thread.
+          // When no work remains, this call will return so that we can look 
for compaction
+          // work.
+          LOG.debug("Checking to see if any recovery logs need sorting");
+          try {
             nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
+          } catch (KeeperException e) {
+            LOG.error("Error sorting logs", e);
           }
+        }
 
-          performFailureProcessing(errorHistory);
+        performFailureProcessing(errorHistory);
 
-          TExternalCompactionJob job;
-          try {
-            TNextCompactionJob next = getNextJob(getNextId());
-            job = next.getJob();
-            if (!job.isSetExternalCompactionId()) {
-              LOG.trace("No external compactions in queue {}", 
this.getResourceGroup());
-              
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
-              continue;
-            }
-            if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
-              throw new IllegalStateException("Returned eci " + 
job.getExternalCompactionId()
-                  + " does not match supplied eci " + 
currentCompactionId.get());
-            }
-          } catch (RetriesExceededException e2) {
-            LOG.warn("Retries exceeded getting next job. Retrying...");
+        TExternalCompactionJob job;
+        try {
+          TNextCompactionJob next = getNextJob(getNextId());
+          job = next.getJob();
+          if (!job.isSetExternalCompactionId()) {
+            LOG.trace("No external compactions in queue {}", 
this.getResourceGroup());
+            
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
             continue;
           }
-          LOG.debug("Received next compaction job: {}", job);
+          if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
+            throw new IllegalStateException("Returned eci " + 
job.getExternalCompactionId()
+                + " does not match supplied eci " + currentCompactionId.get());
+          }
+        } catch (RetriesExceededException e2) {
+          LOG.warn("Retries exceeded getting next job. Retrying...");
+          continue;
+        }
+        LOG.debug("Received next compaction job: {}", job);
 
-          final LongAdder totalInputEntries = new LongAdder();
-          final LongAdder totalInputBytes = new LongAdder();
-          final CountDownLatch started = new CountDownLatch(1);
-          final CountDownLatch stopped = new CountDownLatch(1);
+        final LongAdder totalInputEntries = new LongAdder();
+        final LongAdder totalInputBytes = new LongAdder();
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch stopped = new CountDownLatch(1);
 
-          final FileCompactorRunnable fcr =
-              createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
+        final FileCompactorRunnable fcr =
+            createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
 
-          final Thread compactionThread = Threads.createNonCriticalThread(
-              "Compaction job for tablet " + job.getExtent().toString(), fcr);
+        final Thread compactionThread = Threads.createNonCriticalThread(
+            "Compaction job for tablet " + job.getExtent().toString(), fcr);
 
-          JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
+        JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
 
-          try {
-            // mark compactor as busy while compacting
-            updateIdleStatus(false);
+        try {
+          // mark compactor as busy while compacting
+          updateIdleStatus(false);
 
+          try {
             // Need to call FileCompactorRunnable.initialize after calling 
JOB_HOLDER.set
             fcr.initialize();
-
-            compactionThread.start(); // start the compactionThread
-            started.await(); // wait until the compactor is started
-            final long inputEntries = totalInputEntries.sum();
-            final long waitTime = 
calculateProgressCheckTime(totalInputBytes.sum());
-            LOG.debug("Progress checks will occur every {} seconds", waitTime);
-            String percentComplete = "unknown";
-
-            while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
-              List<CompactionInfo> running =
-                  
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
-              if (!running.isEmpty()) {
-                // Compaction has started. There should only be one in the list
-                CompactionInfo info = running.get(0);
-                if (info != null) {
-                  final long entriesRead = info.getEntriesRead();
-                  final long entriesWritten = info.getEntriesWritten();
-                  if (inputEntries > 0) {
-                    percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
-                  }
-                  String message = String.format(
-                      "Compaction in progress, read %d of %d input entries ( 
%s %s ), written %d entries",
-                      entriesRead, inputEntries, percentComplete, "%", 
entriesWritten);
-                  watcher.run();
-                  try {
-                    LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
-                    TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(
-                        TCompactionState.IN_PROGRESS, message, inputEntries, 
entriesRead,
-                        entriesWritten, fcr.getCompactionAge().toNanos());
-                    updateCompactionState(job, update);
-                  } catch (RetriesExceededException e) {
-                    LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
-                        e.getMessage());
-                  }
-                }
-              } else {
-                LOG.debug("Waiting on compaction thread to finish, but no 
RUNNING compaction");
-              }
-            }
-            compactionThread.join();
-            LOG.trace("Compaction thread finished.");
-            // Run the watcher again to clear out the finished compaction and 
set the
-            // stuck count to zero.
-            watcher.run();
-
-            if (err.get() != null) {
-              // maybe the error occured because the table was deleted or 
something like that, so
-              // force a cancel check to possibly reduce noise in the logs
-              checkIfCanceled();
+          } catch (RetriesExceededException e) {
+            LOG.error(
+                "Error starting FileCompactableRunnable, cancelling compaction 
and moving to next job.",
+                e);
+            try {
+              cancel(job.getExternalCompactionId());
+            } catch (TException e1) {
+              LOG.error("Error cancelling compaction.", e1);
             }
+            continue;
+          } finally {
+            currentCompactionId.set(null);

Review Comment:
   Does not seem like this should be cleared here because its used for dead 
compaction detection and it should be set for the entire time that a compactor 
is running a compaction.



##########
test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.ServerContext;
+import 
org.apache.accumulo.test.functional.ExitCodesIT.ProcessProxy.TerminalBehavior;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.accumulo.tserver.TabletServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.modifier.Visibility;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.ExceptionMethod;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.matcher.ElementMatchers;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class ExitCodesIT extends SharedMiniClusterBase {
+
+  public static class ServerContextFunction implements 
Function<SiteConfiguration,ServerContext> {
+
+    @Override
+    public ServerContext apply(SiteConfiguration site) {
+      return new ServerContext(site);
+    }
+
+  }
+
+  // Dynamic Proxy class for a server instance that will be invoked via
+  // Accumulo's Main class using MiniAccumuloClusterImpl._exec(). This
+  // ProcessProxy class will determine which class to instantiate
+  // and what the terminal behavior should be using two system
+  // properties. A subclass of the desired class is dynamically created
+  // and started.
+  public static class ProcessProxy {
+
+    public static enum TerminalBehavior {
+      SHUTDOWN, EXCEPTION, ERROR
+    };
+
+    public static final String PROXY_CLASS = "proxy.server";
+    public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior";
+
+    public static void main(String[] args) throws Exception {
+
+      final String proxyServer = System.getProperty(PROXY_CLASS);
+      Objects.requireNonNull(proxyServer, PROXY_CLASS + " must exist as an env 
var");
+      final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR);
+      Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as 
an env var");
+
+      final ServerType st = ServerType.valueOf(proxyServer);
+      final TerminalBehavior behavior = 
TerminalBehavior.valueOf(methodBehavior);
+
+      // Determine the constructor arguments and parameters for each server 
class.
+      // Find a method with no-args that does not return anything that is
+      // called during the servers run method that we can intercept to signal
+      // shutdown, exception, or error.
+      final Class<? extends AbstractServer> serverClass;
+      final String methodName;
+      final Class<?>[] ctorParams;
+      final Object[] ctorArgs;
+      switch (st) {
+        case COMPACTOR:
+          List<String> compactorArgs = new ArrayList<>();
+          compactorArgs.add("-o");
+          compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST");
+          serverClass = Compactor.class;
+          methodName = "updateIdleStatus";

Review Comment:
   What happens w/ the test if this method does not exists?  Does the test 
fail?  
   Seems like maybe it would fail because the SHUTDOWN case would expect an 
exit code of zero but would see some other exit code.  But not sure.  
   
   Wondering if instead of this run time overriding if it would be possible to 
do it at compile time instead w/ something like the following.  Then if someone 
changes a method name they will get a compile time error.
   
   ```java
   class ExitCompactor extends  Compactor {
   
     private final TerminalBehavior terminalBehavior;
   
     protected ExitCompactor(ConfigOpts opts, String[] args, TerminalBehavior 
terminalBehavior) {
       super(opts, args);
       this.terminalBehavior = terminalBehavior;
     }
   
     @Override
     public void updateIdleStatus(boolean idle){
       switch (terminalBehavior){
         case SHUTDOWN:
           super.requestShutdownForTests();
           break;
         case ERROR:
           throw new StackOverflowError();
         case EXCEPTION:
           throw new RuntimeException();
       }
     }
   }
   ```
   
   



##########
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java:
##########
@@ -829,237 +829,241 @@ public void run() {
         
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
 
     LOG.info("Compactor started, waiting for work");
-    try {
-
-      final AtomicReference<Throwable> err = new AtomicReference<>();
-      final LogSorter logSorter = new LogSorter(this);
-      long nextSortLogsCheckTime = System.currentTimeMillis();
 
-      while (!isShutdownRequested()) {
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.info("Server process thread has been interrupted, shutting 
down");
-          break;
-        }
-        try {
-          // mark compactor as idle while not in the compaction loop
-          updateIdleStatus(true);
+    final AtomicReference<Throwable> err = new AtomicReference<>();
+    final LogSorter logSorter = new LogSorter(this);
+    long nextSortLogsCheckTime = System.currentTimeMillis();
 
-          currentCompactionId.set(null);
-          err.set(null);
-          JOB_HOLDER.reset();
-
-          if (System.currentTimeMillis() > nextSortLogsCheckTime) {
-            // Attempt to process all existing log sorting work serially in 
this thread.
-            // When no work remains, this call will return so that we can look 
for compaction
-            // work.
-            LOG.debug("Checking to see if any recovery logs need sorting");
+    while (!isShutdownRequested()) {
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Server process thread has been interrupted, shutting down");
+        break;
+      }
+      try {
+        // mark compactor as idle while not in the compaction loop
+        updateIdleStatus(true);
+
+        currentCompactionId.set(null);
+        err.set(null);
+        JOB_HOLDER.reset();
+
+        if (System.currentTimeMillis() > nextSortLogsCheckTime) {
+          // Attempt to process all existing log sorting work serially in this 
thread.
+          // When no work remains, this call will return so that we can look 
for compaction
+          // work.
+          LOG.debug("Checking to see if any recovery logs need sorting");
+          try {
             nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
+          } catch (KeeperException e) {
+            LOG.error("Error sorting logs", e);
           }
+        }
 
-          performFailureProcessing(errorHistory);
+        performFailureProcessing(errorHistory);
 
-          TExternalCompactionJob job;
-          try {
-            TNextCompactionJob next = getNextJob(getNextId());
-            job = next.getJob();
-            if (!job.isSetExternalCompactionId()) {
-              LOG.trace("No external compactions in queue {}", 
this.getResourceGroup());
-              
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
-              continue;
-            }
-            if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
-              throw new IllegalStateException("Returned eci " + 
job.getExternalCompactionId()
-                  + " does not match supplied eci " + 
currentCompactionId.get());
-            }
-          } catch (RetriesExceededException e2) {
-            LOG.warn("Retries exceeded getting next job. Retrying...");
+        TExternalCompactionJob job;
+        try {
+          TNextCompactionJob next = getNextJob(getNextId());
+          job = next.getJob();
+          if (!job.isSetExternalCompactionId()) {
+            LOG.trace("No external compactions in queue {}", 
this.getResourceGroup());
+            
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
             continue;
           }
-          LOG.debug("Received next compaction job: {}", job);
+          if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
+            throw new IllegalStateException("Returned eci " + 
job.getExternalCompactionId()
+                + " does not match supplied eci " + currentCompactionId.get());
+          }
+        } catch (RetriesExceededException e2) {
+          LOG.warn("Retries exceeded getting next job. Retrying...");
+          continue;
+        }
+        LOG.debug("Received next compaction job: {}", job);
 
-          final LongAdder totalInputEntries = new LongAdder();
-          final LongAdder totalInputBytes = new LongAdder();
-          final CountDownLatch started = new CountDownLatch(1);
-          final CountDownLatch stopped = new CountDownLatch(1);
+        final LongAdder totalInputEntries = new LongAdder();
+        final LongAdder totalInputBytes = new LongAdder();
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch stopped = new CountDownLatch(1);
 
-          final FileCompactorRunnable fcr =
-              createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
+        final FileCompactorRunnable fcr =
+            createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err);
 
-          final Thread compactionThread = Threads.createNonCriticalThread(
-              "Compaction job for tablet " + job.getExtent().toString(), fcr);
+        final Thread compactionThread = Threads.createNonCriticalThread(
+            "Compaction job for tablet " + job.getExtent().toString(), fcr);
 
-          JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
+        JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
 
-          try {
-            // mark compactor as busy while compacting
-            updateIdleStatus(false);
+        try {
+          // mark compactor as busy while compacting
+          updateIdleStatus(false);
 
+          try {
             // Need to call FileCompactorRunnable.initialize after calling 
JOB_HOLDER.set
             fcr.initialize();
-
-            compactionThread.start(); // start the compactionThread
-            started.await(); // wait until the compactor is started
-            final long inputEntries = totalInputEntries.sum();
-            final long waitTime = 
calculateProgressCheckTime(totalInputBytes.sum());
-            LOG.debug("Progress checks will occur every {} seconds", waitTime);
-            String percentComplete = "unknown";
-
-            while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
-              List<CompactionInfo> running =
-                  
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
-              if (!running.isEmpty()) {
-                // Compaction has started. There should only be one in the list
-                CompactionInfo info = running.get(0);
-                if (info != null) {
-                  final long entriesRead = info.getEntriesRead();
-                  final long entriesWritten = info.getEntriesWritten();
-                  if (inputEntries > 0) {
-                    percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
-                  }
-                  String message = String.format(
-                      "Compaction in progress, read %d of %d input entries ( 
%s %s ), written %d entries",
-                      entriesRead, inputEntries, percentComplete, "%", 
entriesWritten);
-                  watcher.run();
-                  try {
-                    LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
-                    TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(
-                        TCompactionState.IN_PROGRESS, message, inputEntries, 
entriesRead,
-                        entriesWritten, fcr.getCompactionAge().toNanos());
-                    updateCompactionState(job, update);
-                  } catch (RetriesExceededException e) {
-                    LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
-                        e.getMessage());
-                  }
-                }
-              } else {
-                LOG.debug("Waiting on compaction thread to finish, but no 
RUNNING compaction");
-              }
-            }
-            compactionThread.join();
-            LOG.trace("Compaction thread finished.");
-            // Run the watcher again to clear out the finished compaction and 
set the
-            // stuck count to zero.
-            watcher.run();
-
-            if (err.get() != null) {
-              // maybe the error occured because the table was deleted or 
something like that, so
-              // force a cancel check to possibly reduce noise in the logs
-              checkIfCanceled();
+          } catch (RetriesExceededException e) {
+            LOG.error(
+                "Error starting FileCompactableRunnable, cancelling compaction 
and moving to next job.",
+                e);
+            try {
+              cancel(job.getExternalCompactionId());
+            } catch (TException e1) {
+              LOG.error("Error cancelling compaction.", e1);
             }
+            continue;
+          } finally {
+            currentCompactionId.set(null);
+          }
 
-            if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
-                || (err.get() != null && 
err.get().getClass().equals(InterruptedException.class))) {
-              LOG.warn("Compaction thread was interrupted, sending CANCELLED 
state");
-              try {
-                TCompactionStatusUpdate update =
-                    new TCompactionStatusUpdate(TCompactionState.CANCELLED, 
"Compaction cancelled",
-                        -1, -1, -1, fcr.getCompactionAge().toNanos());
-                updateCompactionState(job, update);
-                updateCompactionFailed(job, 
InterruptedException.class.getName());
-                cancelled.incrementAndGet();
-              } catch (RetriesExceededException e) {
-                LOG.error("Error updating coordinator with compaction 
cancellation.", e);
-              } finally {
-                currentCompactionId.set(null);
-              }
-            } else if (err.get() != null) {
-              final KeyExtent fromThriftExtent = 
KeyExtent.fromThrift(job.getExtent());
-              try {
-                LOG.info("Updating coordinator with compaction failure: id: 
{}, extent: {}",
-                    job.getExternalCompactionId(), fromThriftExtent);
-                TCompactionStatusUpdate update = new TCompactionStatusUpdate(
-                    TCompactionState.FAILED, "Compaction failed due to: " + 
err.get().getMessage(),
-                    -1, -1, -1, fcr.getCompactionAge().toNanos());
-                updateCompactionState(job, update);
-                updateCompactionFailed(job, err.get().getClass().getName());
-                failed.incrementAndGet();
-                errorHistory.addError(fromThriftExtent.tableId(), err.get());
-              } catch (RetriesExceededException e) {
-                LOG.error("Error updating coordinator with compaction failure: 
id: {}, extent: {}",
-                    job.getExternalCompactionId(), fromThriftExtent, e);
-              } finally {
-                currentCompactionId.set(null);
-              }
-            } else {
-              try {
-                LOG.trace("Updating coordinator with compaction completion.");
-                updateCompactionCompleted(job, JOB_HOLDER.getStats());
-                completed.incrementAndGet();
-                // job completed successfully, clear the error history
-                errorHistory.clear();
-              } catch (RetriesExceededException e) {
-                LOG.error(
-                    "Error updating coordinator with compaction completion, 
cancelling compaction.",
-                    e);
+          compactionThread.start(); // start the compactionThread
+          started.await(); // wait until the compactor is started
+          final long inputEntries = totalInputEntries.sum();
+          final long waitTime = 
calculateProgressCheckTime(totalInputBytes.sum());
+          LOG.debug("Progress checks will occur every {} seconds", waitTime);
+          String percentComplete = "unknown";
+
+          while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
+            List<CompactionInfo> running =
+                
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
+            if (!running.isEmpty()) {
+              // Compaction has started. There should only be one in the list
+              CompactionInfo info = running.get(0);
+              if (info != null) {
+                final long entriesRead = info.getEntriesRead();
+                final long entriesWritten = info.getEntriesWritten();
+                if (inputEntries > 0) {
+                  percentComplete = Float.toString((entriesRead / (float) 
inputEntries) * 100);
+                }
+                String message = String.format(
+                    "Compaction in progress, read %d of %d input entries ( %s 
%s ), written %d entries",
+                    entriesRead, inputEntries, percentComplete, "%", 
entriesWritten);
+                watcher.run();
                 try {
-                  cancel(job.getExternalCompactionId());
-                } catch (TException e1) {
-                  LOG.error("Error cancelling compaction.", e1);
+                  LOG.debug("Updating coordinator with compaction progress: 
{}.", message);
+                  TCompactionStatusUpdate update = new TCompactionStatusUpdate(
+                      TCompactionState.IN_PROGRESS, message, inputEntries, 
entriesRead,
+                      entriesWritten, fcr.getCompactionAge().toNanos());
+                  updateCompactionState(job, update);
+                } catch (RetriesExceededException e) {
+                  LOG.warn("Error updating coordinator with compaction 
progress, error: {}",
+                      e.getMessage());
                 }
-              } finally {
-                currentCompactionId.set(null);
               }
+            } else {
+              LOG.debug("Waiting on compaction thread to finish, but no 
RUNNING compaction");
             }
-          } catch (RuntimeException e1) {
-            LOG.error(
-                "Compactor thread was interrupted waiting for compaction to 
start, cancelling job",
-                e1);
+          }
+          compactionThread.join();
+          LOG.trace("Compaction thread finished.");
+          // Run the watcher again to clear out the finished compaction and 
set the
+          // stuck count to zero.
+          watcher.run();
+
+          if (err.get() != null) {
+            // maybe the error occured because the table was deleted or 
something like that, so
+            // force a cancel check to possibly reduce noise in the logs
+            checkIfCanceled();
+          }
+
+          if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
+              || (err.get() != null && 
err.get().getClass().equals(InterruptedException.class))) {
+            LOG.warn("Compaction thread was interrupted, sending CANCELLED 
state");
             try {
-              cancel(job.getExternalCompactionId());
-            } catch (TException e2) {
-              LOG.error("Error cancelling compaction.", e2);
+              TCompactionStatusUpdate update =
+                  new TCompactionStatusUpdate(TCompactionState.CANCELLED, 
"Compaction cancelled",
+                      -1, -1, -1, fcr.getCompactionAge().toNanos());
+              updateCompactionState(job, update);
+              updateCompactionFailed(job, 
InterruptedException.class.getName());
+              cancelled.incrementAndGet();
+            } catch (RetriesExceededException e) {
+              LOG.error("Error updating coordinator with compaction 
cancellation.", e);
+            } finally {
+              currentCompactionId.set(null);
             }
-          } finally {
-            currentCompactionId.set(null);
+          } else if (err.get() != null) {
+            final KeyExtent fromThriftExtent = 
KeyExtent.fromThrift(job.getExtent());
+            try {
+              LOG.info("Updating coordinator with compaction failure: id: {}, 
extent: {}",
+                  job.getExternalCompactionId(), fromThriftExtent);
+              TCompactionStatusUpdate update = new 
TCompactionStatusUpdate(TCompactionState.FAILED,
+                  "Compaction failed due to: " + err.get().getMessage(), -1, 
-1, -1,
+                  fcr.getCompactionAge().toNanos());
+              updateCompactionState(job, update);
+              updateCompactionFailed(job, err.get().getClass().getName());
+              failed.incrementAndGet();
+              errorHistory.addError(fromThriftExtent.tableId(), err.get());
+            } catch (RetriesExceededException e) {
+              LOG.error("Error updating coordinator with compaction failure: 
id: {}, extent: {}",
+                  job.getExternalCompactionId(), fromThriftExtent, e);
+            } finally {
+              currentCompactionId.set(null);
+            }
+          } else {
+            try {
+              LOG.trace("Updating coordinator with compaction completion.");
+              updateCompactionCompleted(job, JOB_HOLDER.getStats());
+              completed.incrementAndGet();
+              // job completed successfully, clear the error history
+              errorHistory.clear();
+            } catch (RetriesExceededException e) {
+              LOG.error(
+                  "Error updating coordinator with compaction completion, 
cancelling compaction.",
+                  e);
+              try {
+                cancel(job.getExternalCompactionId());
+              } catch (TException e1) {
+                LOG.error("Error cancelling compaction.", e1);
+              }
+            } finally {
+              currentCompactionId.set(null);
+            }
+          }
+        } catch (RuntimeException e1) {
+          LOG.error(
+              "Compactor thread was interrupted waiting for compaction to 
start, cancelling job",
+              e1);
+          try {
+            cancel(job.getExternalCompactionId());
+          } catch (TException e2) {
+            LOG.error("Error cancelling compaction.", e2);
+          }
+        } finally {
+          currentCompactionId.set(null);
 
-            // mark compactor as idle after compaction completes
-            updateIdleStatus(true);
+          // mark compactor as idle after compaction completes
+          updateIdleStatus(true);
 
-            // In the case where there is an error in the foreground code the 
background compaction
-            // may still be running. Must cancel it before starting another 
iteration of the loop to
-            // avoid multiple threads updating shared state.
-            while (compactionThread.isAlive()) {
-              compactionThread.interrupt();
-              compactionThread.join(1000);
-            }
+          // In the case where there is an error in the foreground code the 
background compaction
+          // may still be running. Must cancel it before starting another 
iteration of the loop to
+          // avoid multiple threads updating shared state.
+          while (compactionThread.isAlive()) {
+            compactionThread.interrupt();
+            compactionThread.join(1000);
           }
-        } catch (InterruptedException e) {
-          LOG.info("Interrupt Exception received, shutting down");
-          gracefulShutdown(getContext().rpcCreds());
         }
-      } // end while
-    } catch (Exception e) {
-      LOG.error("Unhandled error occurred in Compactor", e);
-    } finally {
-      // Shutdown local thrift server
-      LOG.debug("Stopping Thrift Servers");
-      if (getThriftServer() != null) {
-        getThriftServer().stop();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupt Exception received, shutting down");
+        gracefulShutdown(getContext().rpcCreds());
       }
+    } // end while, shutdown requested
 
-      try {
-        LOG.debug("Closing filesystems");
-        VolumeManager mgr = getContext().getVolumeManager();
-        if (null != mgr) {
-          mgr.close();
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
-      }
+    // Shutdown local thrift server
+    LOG.debug("Stopping Thrift Servers");
+    if (getThriftServer() != null) {
+      getThriftServer().stop();
+    }
 
-      getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
-      super.close();
-      getShutdownComplete().set(true);
-      LOG.info("stop requested. exiting ... ");
-      try {
-        if (null != compactorLock) {
-          compactorLock.unlock();
-        }
-      } catch (Exception e) {
-        LOG.warn("Failed to release compactor lock", e);
+    try {
+      LOG.debug("Closing filesystems");
+      VolumeManager mgr = getContext().getVolumeManager();
+      if (null != mgr) {
+        mgr.close();
       }
+    } catch (IOException e) {
+      LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
     }
 
+    super.close();

Review Comment:
   Seems like the the new pattern in this PR is that super.close() is always 
called at the end of each run() method.  Wondering if would make sense to push 
this into AbstractServer like the following.  This would centralize the pattern 
to one place in the code.
   
   ```java
     public void runServer() throws Exception {
       final AtomicReference<Throwable> err = new AtomicReference<>();
       serverThread = new Thread(TraceUtil.wrap(()->{
         this.run();
         close();
       }), applicationName);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to