Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][CLUS] Complete NC Registration When Response is Received ......................................................................
[NO ISSUE][CLUS] Complete NC Registration When Response is Received - user model changes: no - storage format changes: no - interface changes: no Details: - Mark NC registration as completed when the regisration reponse is received from CC. - Send NC startup tasks request to CC after all previous tasks complete. Change-Id: I4ff41f86a11b52cae894fe40ffa0353f2fb52138 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2525 Reviewed-by: Michael Blow <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java 4 files changed, 51 insertions(+), 46 deletions(-) Approvals: Anon. E. Moose #1000171: Michael Blow: Looks good to me, approved Murtadha Hubail: Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 57d080e..494198b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -211,7 +211,6 @@ ? getCurrentSystemState() : SystemState.HEALTHY; RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), currentStatus, systemState); - ncs.notifyRegistrationCompleted(ccId); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java index 444b08f..9302f46 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java @@ -107,4 +107,16 @@ } return first; } + + /** + * Returns a throwable containing {@code thread} stacktrace + * + * @param thread + * @return The throwable with {@code thread} stacktrace + */ + public static Throwable fromThreadStack(Thread thread) { + final Throwable stackThrowable = new Throwable(thread.getName() + " Stack trace"); + stackThrowable.setStackTrace(thread.getStackTrace()); + return stackThrowable; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 6d54843..a74a1ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -441,6 +441,8 @@ ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod()); ccTimers.put(ccId, ccTimer); } + ccc.notifyRegistrationCompleted(); + LOGGER.info("Registering with Cluster Controller {} completed", ccc); return ccId; } @@ -661,6 +663,10 @@ return messagingNetManager; } + public void notifyTasksCompleted(CcId ccId) throws Exception { + application.onRegisterNode(ccId); + } + private static INCApplication getApplication(NCConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException { if (config.getAppClass() != null) { @@ -722,11 +728,5 @@ public INCApplication getApplication() { return application; - } - - public void notifyRegistrationCompleted(CcId ccId) { - CcConnection ccc = getCcConnection(ccId); - ccc.notifyRegistrationCompleted(); - LOGGER.info("Registering with Cluster Controller {} complete", ccc); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java index 156a5c9..5964c04 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java @@ -18,84 +18,78 @@ */ package org.apache.hyracks.control.nc.work; -import java.util.Arrays; import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.Span; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -@SuppressWarnings({ "squid:S1181", "squid:S1166" }) +@SuppressWarnings("squid:S1181") public class EnsureAllCcTasksCompleted implements Runnable { private static final Logger LOGGER = LogManager.getLogger(); private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2); private final NodeControllerService ncs; private final CcId ccId; - private final Deque<Task> abortedTasks; - private final Span span; + private final Deque<Task> runningTasks; - public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> abortedTasks) { + public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> runningTasks) { this.ncs = ncs; this.ccId = ccId; - this.abortedTasks = abortedTasks; - span = Span.start(2, TimeUnit.MINUTES); + this.runningTasks = runningTasks; } @Override public void run() { try { - LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed", ccId); - while (!span.elapsed()) { - removeAborted(); - if (abortedTasks.isEmpty()) { + LOGGER.info("Ensuring all tasks of CC {} have completed", ccId); + final Span maxWaitTime = Span.start(2, TimeUnit.MINUTES); + while (!maxWaitTime.elapsed()) { + removeCompleted(); + if (runningTasks.isEmpty()) { break; } - LOGGER.log(Level.INFO, "{} tasks are still running", abortedTasks.size()); - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a second + LOGGER.info("{} tasks are still running", runningTasks.size()); + TimeUnit.SECONDS.sleep(1); // Check once a second } - if (abortedTasks.isEmpty()) { - LOGGER.log(Level.INFO, "All tasks of {} have completed, Completing registration", ccId); - // all tasks has completed - ncs.getApplication().onRegisterNode(ccId); + if (runningTasks.isEmpty()) { + LOGGER.info("All tasks of CC {} have completed", ccId); + ncs.notifyTasksCompleted(ccId); } else { - LOGGER.log(Level.ERROR, - "Failed to abort all previous tasks associated with CC {} after {}ms. Giving up", ccId, - TIMEOUT); - LOGGER.log(Level.ERROR, "{} tasks failed to complete within timeout", abortedTasks.size()); - abortedTasks.forEach(task -> { - List<Thread> pendingThreads = task.getPendingThreads(); - LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(), - pendingThreads.size()); - pendingThreads.forEach(thread -> { - LOGGER.log(Level.ERROR, "Stuck thread trace: {}", Arrays.toString(thread.getStackTrace())); - }); - }); + LOGGER.error("{} tasks associated with CC {} failed to complete after {}ms. Giving up", + runningTasks.size(), ccId, TIMEOUT); + logPendingTasks(); ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); } } catch (Throwable th) { - try { - LOGGER.log(Level.ERROR, "Failed to abort all previous tasks associated with CC {}", ccId, th); - } catch (Throwable ignore) { - // Ignore logging errors - } + LOGGER.error("Failed to abort all previous tasks associated with CC {}", ccId, th); ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); } } - private void removeAborted() { - int numTasks = abortedTasks.size(); + private void removeCompleted() { + final int numTasks = runningTasks.size(); for (int i = 0; i < numTasks; i++) { - Task task = abortedTasks.poll(); + Task task = runningTasks.poll(); if (!task.isCompleted()) { - abortedTasks.add(task); + runningTasks.add(task); + } + } + } + + private void logPendingTasks() { + for (Task task : runningTasks) { + final List<Thread> pendingThreads = task.getPendingThreads(); + LOGGER.error("task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(), pendingThreads.size()); + for (Thread thread : pendingThreads) { + LOGGER.error("Stuck thread trace", ExceptionUtils.fromThreadStack(thread)); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2525 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4ff41f86a11b52cae894fe40ffa0353f2fb52138 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: release-0.9.4-pre-rc Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
