abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][ING] Improvements to active retry policy ......................................................................
[NO ISSUE][ING] Improvements to active retry policy - user model changes: no - storage format changes: no - interface changes: yes - IRetryPolicy.retry now takes a Throwable parameter Details: - This change improves the retry policy for active entities by providing the failure causing the last failure. - The change also removes the lock over the active notification handler on the recover call. Change-Id: I4246e2a276e1f80569a07630e182aab8f49dd115 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2665 Reviewed-by: Michael Blow <[email protected]> Contrib: Michael Blow <[email protected]> Integration-Tests: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Contrib: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java 10 files changed, 70 insertions(+), 41 deletions(-) Approvals: Jenkins: Verified; ; Verified Michael Blow: Looks good to me, approved; Verified; No violations found; Verified Objections: Anon. E. Moose #1000171: Violations found Jenkins: Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java index 6633810..b964430 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java @@ -28,7 +28,7 @@ } @Override - public boolean retry() { + public boolean retry(Throwable failure) { if (attempted < count) { attempted++; return true; diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java index a010984..1daf07e 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java @@ -21,7 +21,9 @@ @FunctionalInterface public interface IRetryPolicy { /** + * @param failure + * the cause of the active entity failure * @return true if one more attempt should be done */ - boolean retry(); + boolean retry(Throwable failure); } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java index 074f8f4..fde67e6 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java @@ -27,7 +27,7 @@ } @Override - public boolean retry() { + public boolean retry(Throwable failure) { synchronized (listener) { try { listener.wait(5000); //NOSONAR this method is being called in a while loop diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java index 1596c17..a48283a 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java @@ -20,7 +20,7 @@ public class NoRetryPolicyFactory implements IRetryPolicyFactory { public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory(); - private static final IRetryPolicy policy = () -> false; + private static final IRetryPolicy policy = failure -> false; private NoRetryPolicyFactory() { } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 053e6cd..1b7d5b9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -207,9 +207,9 @@ } @Override - public synchronized void recover() { + public void recover() { LOGGER.log(level, "Starting active recovery"); - for (IActiveEntityEventsListener listener : entityEventListeners.values()) { + for (IActiveEntityEventsListener listener : getEventListeners()) { synchronized (listener) { LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats()); listener.notifyAll(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java index 2de8319..1f72856 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -93,6 +93,7 @@ synchronized (listener) { if (!cancelRecovery) { listener.setState(ActivityState.PERMANENTLY_FAILED); + listener.setRunning(metadataProvider, false); } } } else { @@ -112,7 +113,7 @@ return null; } LOGGER.log(level, "calling the policy"); - while (policy.retry()) { + while (policy.retry(failure)) { synchronized (listener) { if (cancelRecovery) { return null; @@ -170,7 +171,9 @@ return null; } if (listener.getState() == ActivityState.TEMPORARILY_FAILED) { + LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId()); listener.setState(ActivityState.PERMANENTLY_FAILED); + listener.setRunning(metadataProvider, false); } listener.notifyAll(); } diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java index c4b23ef..8367fa0 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java @@ -213,6 +213,8 @@ return aOrderedListToString((AOrderedList) aObj); case STRING: return ((AString) aObj).getStringValue(); + case BOOLEAN: + return ((ABoolean) aObj).getBoolean().toString(); default: throw new AlgebricksException("value of type " + aObj.getType() + " is not supported yet"); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index cfa6f78..eaec3c3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -84,8 +84,8 @@ private volatile long reqId = 0L; - private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2, - r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName())); + private final ExecutorService uninterruptibleExecutor = + Executors.newFixedThreadPool(2, r -> new Thread(r, "HyracksConnection Uninterrubtible thread: ")); private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1); @@ -367,6 +367,11 @@ return null; } + @Override + public String toString() { + return "CancelJobRequest: " + jobId.toString(); + } + } private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> { @@ -407,24 +412,35 @@ } } + @Override + public String toString() { + return "StartJobRequest"; + } + } private class UninterrubtileRequestHandler implements Runnable { @SuppressWarnings({ "squid:S2189", "squid:S2142" }) @Override public void run() { - while (true) { - try { - UnInterruptibleRequest<?> next = uninterruptibles.take(); - reqId++; - running = true; - next.handle(); - } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); - continue; - } finally { - running = false; + String nameBefore = Thread.currentThread().getName(); + Thread.currentThread().setName(nameBefore + getClass().getSimpleName()); + try { + while (true) { + try { + UnInterruptibleRequest<?> current = uninterruptibles.take(); + reqId++; + running = true; + current.handle(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); + continue; + } finally { + running = false; + } } + } finally { + Thread.currentThread().setName(nameBefore); } } } @@ -433,25 +449,31 @@ @Override @SuppressWarnings({ "squid:S2189", "squid:S2142" }) public void run() { - long currentReqId = 0L; - long currentTime = System.nanoTime(); - while (true) { - try { - TimeUnit.MINUTES.sleep(1); - } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); - continue; - } - if (running) { - if (reqId == currentReqId) { - if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) { - ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST); + String nameBefore = Thread.currentThread().getName(); + Thread.currentThread().setName(nameBefore + getClass().getSimpleName()); + try { + long currentReqId = 0L; + long currentTime = System.nanoTime(); + while (true) { + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); + continue; + } + if (running) { + if (reqId == currentReqId) { + if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) { + ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST); + } + } else { + currentReqId = reqId; + currentTime = System.nanoTime(); } - } else { - currentReqId = reqId; - currentTime = System.nanoTime(); } } + } finally { + Thread.currentThread().setName(nameBefore); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java index 5964c04..0f36c80 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java @@ -66,11 +66,11 @@ LOGGER.error("{} tasks associated with CC {} failed to complete after {}ms. Giving up", runningTasks.size(), ccId, TIMEOUT); logPendingTasks(); - ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); + ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); } } catch (Throwable th) { LOGGER.error("Failed to abort all previous tasks associated with CC {}", ccId, th); - ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); + ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index c8b9112..8500842 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -33,7 +33,7 @@ public static final int EC_ABNORMAL_TERMINATION = 1; public static final int EC_FAILED_TO_STARTUP = 2; public static final int EC_FAILED_TO_RECOVER = 3; - public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4; + public static final int EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4; public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5; public static final int EC_UNHANDLED_EXCEPTION = 11; public static final int EC_IMMEDIATE_HALT = 33; @@ -75,8 +75,8 @@ exit(status); } - public static void halt(int status) { - LOGGER.fatal("JVM halting with status " + status + "; bye!", new Throwable("halt stacktrace")); + public static synchronized void halt(int status) { + LOGGER.fatal("JVM halting with status {}; thread dump at halt: {}", status, ThreadDumpUtil.takeDumpString()); // try to give time for the log to be emitted... LogManager.shutdown(); Runtime.getRuntime().halt(status); -- To view, visit https://asterix-gerrit.ics.uci.edu/2665 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4246e2a276e1f80569a07630e182aab8f49dd115 Gerrit-PatchSet: 9 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
