dlmarion commented on code in PR #5811: URL: https://github.com/apache/accumulo/pull/5811#discussion_r2304602754
########## server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java: ########## @@ -829,237 +829,241 @@ public void run() { getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); LOG.info("Compactor started, waiting for work"); - try { - - final AtomicReference<Throwable> err = new AtomicReference<>(); - final LogSorter logSorter = new LogSorter(this); - long nextSortLogsCheckTime = System.currentTimeMillis(); - while (!isShutdownRequested()) { - if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); - break; - } - try { - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); + final AtomicReference<Throwable> err = new AtomicReference<>(); + final LogSorter logSorter = new LogSorter(this); + long nextSortLogsCheckTime = System.currentTimeMillis(); - currentCompactionId.set(null); - err.set(null); - JOB_HOLDER.reset(); - - if (System.currentTimeMillis() > nextSortLogsCheckTime) { - // Attempt to process all existing log sorting work serially in this thread. - // When no work remains, this call will return so that we can look for compaction - // work. - LOG.debug("Checking to see if any recovery logs need sorting"); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + // mark compactor as idle while not in the compaction loop + updateIdleStatus(true); + + currentCompactionId.set(null); + err.set(null); + JOB_HOLDER.reset(); + + if (System.currentTimeMillis() > nextSortLogsCheckTime) { + // Attempt to process all existing log sorting work serially in this thread. + // When no work remains, this call will return so that we can look for compaction + // work. + LOG.debug("Checking to see if any recovery logs need sorting"); + try { nextSortLogsCheckTime = logSorter.sortLogsIfNeeded(); + } catch (KeeperException e) { + LOG.error("Error sorting logs", e); } + } - performFailureProcessing(errorHistory); + performFailureProcessing(errorHistory); - TExternalCompactionJob job; - try { - TNextCompactionJob next = getNextJob(getNextId()); - job = next.getJob(); - if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.getResourceGroup()); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); - continue; - } - if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { - throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() - + " does not match supplied eci " + currentCompactionId.get()); - } - } catch (RetriesExceededException e2) { - LOG.warn("Retries exceeded getting next job. Retrying..."); + TExternalCompactionJob job; + try { + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); + if (!job.isSetExternalCompactionId()) { + LOG.trace("No external compactions in queue {}", this.getResourceGroup()); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); continue; } - LOG.debug("Received next compaction job: {}", job); + if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { + throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() + + " does not match supplied eci " + currentCompactionId.get()); + } + } catch (RetriesExceededException e2) { + LOG.warn("Retries exceeded getting next job. Retrying..."); + continue; + } + LOG.debug("Received next compaction job: {}", job); - final LongAdder totalInputEntries = new LongAdder(); - final LongAdder totalInputBytes = new LongAdder(); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch stopped = new CountDownLatch(1); + final LongAdder totalInputEntries = new LongAdder(); + final LongAdder totalInputBytes = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); - final FileCompactorRunnable fcr = - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - final Thread compactionThread = Threads.createNonCriticalThread( - "Compaction job for tablet " + job.getExtent().toString(), fcr); + final Thread compactionThread = Threads.createNonCriticalThread( + "Compaction job for tablet " + job.getExtent().toString(), fcr); - JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); - try { - // mark compactor as busy while compacting - updateIdleStatus(false); + try { + // mark compactor as busy while compacting + updateIdleStatus(false); + try { // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set fcr.initialize(); - - compactionThread.start(); // start the compactionThread - started.await(); // wait until the compactor is started - final long inputEntries = totalInputEntries.sum(); - final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); - LOG.debug("Progress checks will occur every {} seconds", waitTime); - String percentComplete = "unknown"; - - while (!stopped.await(waitTime, TimeUnit.SECONDS)) { - List<CompactionInfo> running = - org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); - if (!running.isEmpty()) { - // Compaction has started. There should only be one in the list - CompactionInfo info = running.get(0); - if (info != null) { - final long entriesRead = info.getEntriesRead(); - final long entriesWritten = info.getEntriesWritten(); - if (inputEntries > 0) { - percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); - } - String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - entriesRead, inputEntries, percentComplete, "%", entriesWritten); - watcher.run(); - try { - LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, - entriesWritten, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - } catch (RetriesExceededException e) { - LOG.warn("Error updating coordinator with compaction progress, error: {}", - e.getMessage()); - } - } - } else { - LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); - } - } - compactionThread.join(); - LOG.trace("Compaction thread finished."); - // Run the watcher again to clear out the finished compaction and set the - // stuck count to zero. - watcher.run(); - - if (err.get() != null) { - // maybe the error occured because the table was deleted or something like that, so - // force a cancel check to possibly reduce noise in the logs - checkIfCanceled(); + } catch (RetriesExceededException e) { + LOG.error( + "Error starting FileCompactableRunnable, cancelling compaction and moving to next job.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); } + continue; + } finally { + currentCompactionId.set(null); + } - if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() - || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { - LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); - try { - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job, InterruptedException.class.getName()); - cancelled.incrementAndGet(); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction cancellation.", e); - } finally { - currentCompactionId.set(null); - } - } else if (err.get() != null) { - final KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - try { - LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(), - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job, err.get().getClass().getName()); - failed.incrementAndGet(); - errorHistory.addError(fromThriftExtent.tableId(), err.get()); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent, e); - } finally { - currentCompactionId.set(null); - } - } else { - try { - LOG.trace("Updating coordinator with compaction completion."); - updateCompactionCompleted(job, JOB_HOLDER.getStats()); - completed.incrementAndGet(); - // job completed successfully, clear the error history - errorHistory.clear(); - } catch (RetriesExceededException e) { - LOG.error( - "Error updating coordinator with compaction completion, cancelling compaction.", - e); + compactionThread.start(); // start the compactionThread + started.await(); // wait until the compactor is started + final long inputEntries = totalInputEntries.sum(); + final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); + LOG.debug("Progress checks will occur every {} seconds", waitTime); + String percentComplete = "unknown"; + + while (!stopped.await(waitTime, TimeUnit.SECONDS)) { + List<CompactionInfo> running = + org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); + if (!running.isEmpty()) { + // Compaction has started. There should only be one in the list + CompactionInfo info = running.get(0); + if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); + if (inputEntries > 0) { + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); + } + String message = String.format( + "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", + entriesRead, inputEntries, percentComplete, "%", entriesWritten); + watcher.run(); try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + LOG.debug("Updating coordinator with compaction progress: {}.", message); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, + entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + } catch (RetriesExceededException e) { + LOG.warn("Error updating coordinator with compaction progress, error: {}", + e.getMessage()); } - } finally { - currentCompactionId.set(null); } + } else { + LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } catch (RuntimeException e1) { - LOG.error( - "Compactor thread was interrupted waiting for compaction to start, cancelling job", - e1); + } + compactionThread.join(); + LOG.trace("Compaction thread finished."); + // Run the watcher again to clear out the finished compaction and set the + // stuck count to zero. + watcher.run(); + + if (err.get() != null) { + // maybe the error occured because the table was deleted or something like that, so + // force a cancel check to possibly reduce noise in the logs + checkIfCanceled(); + } + + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() + || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); try { - cancel(job.getExternalCompactionId()); - } catch (TException e2) { - LOG.error("Error cancelling compaction.", e2); + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job, InterruptedException.class.getName()); + cancelled.incrementAndGet(); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction cancellation.", e); + } finally { + currentCompactionId.set(null); } - } finally { - currentCompactionId.set(null); + } else if (err.get() != null) { + final KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + try { + LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, + "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, + fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job, err.get().getClass().getName()); + failed.incrementAndGet(); + errorHistory.addError(fromThriftExtent.tableId(), err.get()); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent, e); + } finally { + currentCompactionId.set(null); + } + } else { + try { + LOG.trace("Updating coordinator with compaction completion."); + updateCompactionCompleted(job, JOB_HOLDER.getStats()); + completed.incrementAndGet(); + // job completed successfully, clear the error history + errorHistory.clear(); + } catch (RetriesExceededException e) { + LOG.error( + "Error updating coordinator with compaction completion, cancelling compaction.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); + } + } finally { + currentCompactionId.set(null); + } + } + } catch (RuntimeException e1) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to start, cancelling job", + e1); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e2) { + LOG.error("Error cancelling compaction.", e2); + } + } finally { + currentCompactionId.set(null); - // mark compactor as idle after compaction completes - updateIdleStatus(true); + // mark compactor as idle after compaction completes + updateIdleStatus(true); - // In the case where there is an error in the foreground code the background compaction - // may still be running. Must cancel it before starting another iteration of the loop to - // avoid multiple threads updating shared state. - while (compactionThread.isAlive()) { - compactionThread.interrupt(); - compactionThread.join(1000); - } + // In the case where there is an error in the foreground code the background compaction + // may still be running. Must cancel it before starting another iteration of the loop to + // avoid multiple threads updating shared state. + while (compactionThread.isAlive()) { + compactionThread.interrupt(); + compactionThread.join(1000); } - } catch (InterruptedException e) { - LOG.info("Interrupt Exception received, shutting down"); - gracefulShutdown(getContext().rpcCreds()); } - } // end while - } catch (Exception e) { - LOG.error("Unhandled error occurred in Compactor", e); - } finally { - // Shutdown local thrift server - LOG.debug("Stopping Thrift Servers"); - if (getThriftServer() != null) { - getThriftServer().stop(); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + gracefulShutdown(getContext().rpcCreds()); } + } // end while, shutdown requested - try { - LOG.debug("Closing filesystems"); - VolumeManager mgr = getContext().getVolumeManager(); - if (null != mgr) { - mgr.close(); - } - } catch (IOException e) { - LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); - } + // Shutdown local thrift server + LOG.debug("Stopping Thrift Servers"); + if (getThriftServer() != null) { + getThriftServer().stop(); + } - getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); - super.close(); - getShutdownComplete().set(true); - LOG.info("stop requested. exiting ... "); - try { - if (null != compactorLock) { - compactorLock.unlock(); - } - } catch (Exception e) { - LOG.warn("Failed to release compactor lock", e); + try { + LOG.debug("Closing filesystems"); + VolumeManager mgr = getContext().getVolumeManager(); + if (null != mgr) { + mgr.close(); } + } catch (IOException e) { + LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); } + super.close(); Review Comment: Implemented in ec5cd1d ########## server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java: ########## @@ -829,237 +829,241 @@ public void run() { getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); LOG.info("Compactor started, waiting for work"); - try { - - final AtomicReference<Throwable> err = new AtomicReference<>(); - final LogSorter logSorter = new LogSorter(this); - long nextSortLogsCheckTime = System.currentTimeMillis(); - while (!isShutdownRequested()) { - if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); - break; - } - try { - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); + final AtomicReference<Throwable> err = new AtomicReference<>(); + final LogSorter logSorter = new LogSorter(this); + long nextSortLogsCheckTime = System.currentTimeMillis(); - currentCompactionId.set(null); - err.set(null); - JOB_HOLDER.reset(); - - if (System.currentTimeMillis() > nextSortLogsCheckTime) { - // Attempt to process all existing log sorting work serially in this thread. - // When no work remains, this call will return so that we can look for compaction - // work. - LOG.debug("Checking to see if any recovery logs need sorting"); + while (!isShutdownRequested()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Server process thread has been interrupted, shutting down"); + break; + } + try { + // mark compactor as idle while not in the compaction loop + updateIdleStatus(true); + + currentCompactionId.set(null); + err.set(null); + JOB_HOLDER.reset(); + + if (System.currentTimeMillis() > nextSortLogsCheckTime) { + // Attempt to process all existing log sorting work serially in this thread. + // When no work remains, this call will return so that we can look for compaction + // work. + LOG.debug("Checking to see if any recovery logs need sorting"); + try { nextSortLogsCheckTime = logSorter.sortLogsIfNeeded(); + } catch (KeeperException e) { + LOG.error("Error sorting logs", e); } + } - performFailureProcessing(errorHistory); + performFailureProcessing(errorHistory); - TExternalCompactionJob job; - try { - TNextCompactionJob next = getNextJob(getNextId()); - job = next.getJob(); - if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.getResourceGroup()); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); - continue; - } - if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { - throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() - + " does not match supplied eci " + currentCompactionId.get()); - } - } catch (RetriesExceededException e2) { - LOG.warn("Retries exceeded getting next job. Retrying..."); + TExternalCompactionJob job; + try { + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); + if (!job.isSetExternalCompactionId()) { + LOG.trace("No external compactions in queue {}", this.getResourceGroup()); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); continue; } - LOG.debug("Received next compaction job: {}", job); + if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { + throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() + + " does not match supplied eci " + currentCompactionId.get()); + } + } catch (RetriesExceededException e2) { + LOG.warn("Retries exceeded getting next job. Retrying..."); + continue; + } + LOG.debug("Received next compaction job: {}", job); - final LongAdder totalInputEntries = new LongAdder(); - final LongAdder totalInputBytes = new LongAdder(); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch stopped = new CountDownLatch(1); + final LongAdder totalInputEntries = new LongAdder(); + final LongAdder totalInputBytes = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); - final FileCompactorRunnable fcr = - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - final Thread compactionThread = Threads.createNonCriticalThread( - "Compaction job for tablet " + job.getExtent().toString(), fcr); + final Thread compactionThread = Threads.createNonCriticalThread( + "Compaction job for tablet " + job.getExtent().toString(), fcr); - JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); - try { - // mark compactor as busy while compacting - updateIdleStatus(false); + try { + // mark compactor as busy while compacting + updateIdleStatus(false); + try { // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set fcr.initialize(); - - compactionThread.start(); // start the compactionThread - started.await(); // wait until the compactor is started - final long inputEntries = totalInputEntries.sum(); - final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); - LOG.debug("Progress checks will occur every {} seconds", waitTime); - String percentComplete = "unknown"; - - while (!stopped.await(waitTime, TimeUnit.SECONDS)) { - List<CompactionInfo> running = - org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); - if (!running.isEmpty()) { - // Compaction has started. There should only be one in the list - CompactionInfo info = running.get(0); - if (info != null) { - final long entriesRead = info.getEntriesRead(); - final long entriesWritten = info.getEntriesWritten(); - if (inputEntries > 0) { - percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); - } - String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - entriesRead, inputEntries, percentComplete, "%", entriesWritten); - watcher.run(); - try { - LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, - entriesWritten, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - } catch (RetriesExceededException e) { - LOG.warn("Error updating coordinator with compaction progress, error: {}", - e.getMessage()); - } - } - } else { - LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); - } - } - compactionThread.join(); - LOG.trace("Compaction thread finished."); - // Run the watcher again to clear out the finished compaction and set the - // stuck count to zero. - watcher.run(); - - if (err.get() != null) { - // maybe the error occured because the table was deleted or something like that, so - // force a cancel check to possibly reduce noise in the logs - checkIfCanceled(); + } catch (RetriesExceededException e) { + LOG.error( + "Error starting FileCompactableRunnable, cancelling compaction and moving to next job.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); } + continue; + } finally { + currentCompactionId.set(null); Review Comment: Removed finally block in ec5cd1d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org