Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3274
Change subject: [NO ISSUE][HYR] EnsureAllCcTasksCompleted failure handling
......................................................................
[NO ISSUE][HYR] EnsureAllCcTasksCompleted failure handling
Don't halt on interrupt while waiting for aborted cc tasks to complete,
or on interrupt while notifying cc of success of the completion
Change-Id: I02819afcb80a0bcd645c3f79950c3fa12dba0274
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
M
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
2 files changed, 36 insertions(+), 28 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/74/3274/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 0f36c80..9e090f2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -40,7 +40,7 @@
private final CcId ccId;
private final Deque<Task> runningTasks;
- public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId,
Deque<Task> runningTasks) {
+ EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId,
Deque<Task> runningTasks) {
this.ncs = ncs;
this.ccId = ccId;
this.runningTasks = runningTasks;
@@ -48,40 +48,47 @@
@Override
public void run() {
+ LOGGER.info("Ensuring all tasks of CC {} have completed", ccId);
try {
- LOGGER.info("Ensuring all tasks of CC {} have completed", ccId);
- final Span maxWaitTime = Span.start(2, TimeUnit.MINUTES);
- while (!maxWaitTime.elapsed()) {
- removeCompleted();
- if (runningTasks.isEmpty()) {
- break;
- }
- LOGGER.info("{} tasks are still running", runningTasks.size());
- TimeUnit.SECONDS.sleep(1); // Check once a second
- }
+ waitForTaskCompletion();
+ } catch (InterruptedException e) {
+ LOGGER.info("interrupted waiting for CC tasks to complete; giving
up");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void waitForTaskCompletion() throws InterruptedException {
+ final Span maxWaitTime = Span.start(TIMEOUT, TimeUnit.MILLISECONDS);
+ while (!maxWaitTime.elapsed()) {
+ removeCompleted();
if (runningTasks.isEmpty()) {
- LOGGER.info("All tasks of CC {} have completed", ccId);
- ncs.notifyTasksCompleted(ccId);
- } else {
- LOGGER.error("{} tasks associated with CC {} failed to
complete after {}ms. Giving up",
- runningTasks.size(), ccId, TIMEOUT);
- logPendingTasks();
-
ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ break;
}
- } catch (Throwable th) {
- LOGGER.error("Failed to abort all previous tasks associated with
CC {}", ccId, th);
+ LOGGER.info("{} tasks are still running", runningTasks.size());
+ TimeUnit.SECONDS.sleep(1); // Check once a second
+ }
+ removeCompleted();
+ if (runningTasks.isEmpty()) {
+ LOGGER.info("all tasks of CC {} have completed", ccId);
+ try {
+ ncs.notifyTasksCompleted(ccId);
+ } catch (InterruptedException e) {
+ LOGGER.info("interrupted during notifyTasksCompleted");
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error("unexpected error during notifyTasksCompleted",
e);
+ ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED);
+ }
+ } else {
+ LOGGER.error("{} tasks associated with CC {} failed to complete
after {}ms. Giving up", runningTasks.size(),
+ ccId, TIMEOUT);
+ logPendingTasks();
ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
}
private void removeCompleted() {
- final int numTasks = runningTasks.size();
- for (int i = 0; i < numTasks; i++) {
- Task task = runningTasks.poll();
- if (!task.isCompleted()) {
- runningTasks.add(task);
- }
- }
+ runningTasks.removeIf(Task::isCompleted);
}
private void logPendingTasks() {
@@ -89,7 +96,7 @@
final List<Thread> pendingThreads = task.getPendingThreads();
LOGGER.error("task {} was stuck. Stuck thread count = {}",
task.getTaskAttemptId(), pendingThreads.size());
for (Thread thread : pendingThreads) {
- LOGGER.error("Stuck thread trace",
ExceptionUtils.fromThreadStack(thread));
+ LOGGER.error("stuck thread trace",
ExceptionUtils.fromThreadStack(thread));
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 52c8f55..680d55e 100644
---
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -51,6 +51,7 @@
public static final int EC_NETWORK_FAILURE = 16;
public static final int EC_ACTIVE_SUSPEND_FAILURE = 17;
public static final int EC_ACTIVE_RESUME_FAILURE = 18;
+ public static final int EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED = 19;
public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
--
To view, visit https://asterix-gerrit.ics.uci.edu/3274
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I02819afcb80a0bcd645c3f79950c3fa12dba0274
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>