Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2525
Change subject: [NO ISSUE][CLUS] Complete NC Registration When Response is
Received
......................................................................
[NO ISSUE][CLUS] Complete NC Registration When Response is Received
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Mark NC registration as completed when the regisration
reponse is received from CC.
- Send NC startup tasks request to CC after all previous
tasks complete.
Change-Id: I4ff41f86a11b52cae894fe40ffa0353f2fb52138
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
6 files changed, 59 insertions(+), 46 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/25/2525/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 57d080e..494198b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -211,7 +211,6 @@
? getCurrentSystemState() : SystemState.HEALTHY;
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService)
ncServiceCtx.getControllerService(),
currentStatus, systemState);
- ncs.notifyRegistrationCompleted(ccId);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 7bb917f..702b6b4 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -124,6 +124,9 @@
// The target dataset for rebalance.
targetDataset =
sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
+ LOGGER.info("Rebalancing dataset {} from node group {} with
nodes {} to node group {} with nodes {}",
+ sourceDataset.getDatasetName(),
sourceDataset.getNodeGroupName(), sourceNodes,
+ targetDataset.getNodeGroupName(), targetNcNames);
// Rebalances the source dataset into the target dataset.
rebalance(sourceDataset, targetDataset, metadataProvider, hcc,
datasetRebalanceCallback);
} else {
@@ -158,6 +161,7 @@
// the source dataset.
runMetadataTransaction(metadataProvider, () ->
dropSourceDataset(sourceDataset, metadataProvider, hcc));
});
+ LOGGER.info("Dataset {} rebalance completed successfully",
datasetName);
}
@FunctionalInterface
@@ -238,6 +242,8 @@
(ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(),
DatasetUtil.getFullyQualifiedName(source));
+ LOGGER.info("Updating dataset {} node group from {} to {}",
source.getDatasetName(), source.getNodeGroupName(),
+ target.getNodeGroupName());
try {
// Updates the dataset entry in the metadata storage
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
@@ -248,6 +254,7 @@
}
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ LOGGER.info("dataset {} node group updated to {}",
target.getDatasetName(), target.getNodeGroupName());
} finally {
lockManager.downgradeDatasetLockToExclusiveModify(metadataProvider.getLocks(),
DatasetUtil.getFullyQualifiedName(target));
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 7182204..73d6705 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -280,6 +280,7 @@
clusterActiveLocations.add(p.getActiveNodeId());
}
}
+ clusterActiveLocations.removeAll(pendingRemoval);
clusterPartitionConstraint =
new
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new
String[] {}));
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index 444b08f..9302f46 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -107,4 +107,16 @@
}
return first;
}
+
+ /**
+ * Returns a throwable containing {@code thread} stacktrace
+ *
+ * @param thread
+ * @return The throwable with {@code thread} stacktrace
+ */
+ public static Throwable fromThreadStack(Thread thread) {
+ final Throwable stackThrowable = new Throwable(thread.getName() + "
Stack trace");
+ stackThrowable.setStackTrace(thread.getStackTrace());
+ return stackThrowable;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 6d54843..a74a1ab 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -441,6 +441,8 @@
ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0,
nodeParameters.getProfileDumpPeriod());
ccTimers.put(ccId, ccTimer);
}
+ ccc.notifyRegistrationCompleted();
+ LOGGER.info("Registering with Cluster Controller {} completed", ccc);
return ccId;
}
@@ -661,6 +663,10 @@
return messagingNetManager;
}
+ public void notifyTasksCompleted(CcId ccId) throws Exception {
+ application.onRegisterNode(ccId);
+ }
+
private static INCApplication getApplication(NCConfig config)
throws ClassNotFoundException, IllegalAccessException,
InstantiationException {
if (config.getAppClass() != null) {
@@ -722,11 +728,5 @@
public INCApplication getApplication() {
return application;
- }
-
- public void notifyRegistrationCompleted(CcId ccId) {
- CcConnection ccc = getCcConnection(ccId);
- ccc.notifyRegistrationCompleted();
- LOGGER.info("Registering with Cluster Controller {} complete", ccc);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 156a5c9..30a2ea9 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -18,85 +18,79 @@
*/
package org.apache.hyracks.control.nc.work;
-import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.Span;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-@SuppressWarnings({ "squid:S1181", "squid:S1166" })
+@SuppressWarnings("squid:S1181")
public class EnsureAllCcTasksCompleted implements Runnable {
private static final Logger LOGGER = LogManager.getLogger();
private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2);
private final NodeControllerService ncs;
private final CcId ccId;
- private final Deque<Task> abortedTasks;
- private final Span span;
+ private final Deque<Task> runningTasks;
- public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId,
Deque<Task> abortedTasks) {
+ public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId,
Deque<Task> runningTasks) {
this.ncs = ncs;
this.ccId = ccId;
- this.abortedTasks = abortedTasks;
- span = Span.start(2, TimeUnit.MINUTES);
+ this.runningTasks = runningTasks;
}
@Override
public void run() {
try {
- LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed",
ccId);
- while (!span.elapsed()) {
- removeAborted();
- if (abortedTasks.isEmpty()) {
+ LOGGER.info("Ensuring all tasks of CC {} have completed", ccId);
+ final Span maxWaitTme = Span.start(2, TimeUnit.MINUTES);
+ while (!maxWaitTme.elapsed()) {
+ removeCompleted();
+ if (runningTasks.isEmpty()) {
break;
}
- LOGGER.log(Level.INFO, "{} tasks are still running",
abortedTasks.size());
- Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a
second
+ LOGGER.info("{} tasks are still running", runningTasks.size());
+ TimeUnit.SECONDS.sleep(1); // Check once a second
}
- if (abortedTasks.isEmpty()) {
- LOGGER.log(Level.INFO, "All tasks of {} have completed,
Completing registration", ccId);
- // all tasks has completed
- ncs.getApplication().onRegisterNode(ccId);
+ if (runningTasks.isEmpty()) {
+ LOGGER.info("All tasks of {} have completed, Completing
registration", ccId);
+ ncs.notifyTasksCompleted(ccId);
} else {
- LOGGER.log(Level.ERROR,
- "Failed to abort all previous tasks associated with CC
{} after {}ms. Giving up", ccId,
- TIMEOUT);
- LOGGER.log(Level.ERROR, "{} tasks failed to complete within
timeout", abortedTasks.size());
- abortedTasks.forEach(task -> {
- List<Thread> pendingThreads = task.getPendingThreads();
- LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread
count = {}", task.getTaskAttemptId(),
- pendingThreads.size());
- pendingThreads.forEach(thread -> {
- LOGGER.log(Level.ERROR, "Stuck thread trace: {}",
Arrays.toString(thread.getStackTrace()));
- });
- });
+ LOGGER.error("{} tasks associated with CC {} failed to
complete after {}ms. Giving up",
+ runningTasks.size(), ccId, TIMEOUT);
+ logPendingTasks();
ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
} catch (Throwable th) {
- try {
- LOGGER.log(Level.ERROR, "Failed to abort all previous tasks
associated with CC {}", ccId, th);
- } catch (Throwable ignore) {
- // Ignore logging errors
- }
+ LOGGER.error("Failed to abort all previous tasks associated with
CC {}", ccId, th);
ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
}
- private void removeAborted() {
- int numTasks = abortedTasks.size();
+ private void removeCompleted() {
+ final int numTasks = runningTasks.size();
for (int i = 0; i < numTasks; i++) {
- Task task = abortedTasks.poll();
+ Task task = runningTasks.poll();
if (!task.isCompleted()) {
- abortedTasks.add(task);
+ runningTasks.add(task);
}
}
}
+
+ private void logPendingTasks() {
+ runningTasks.forEach(task -> {
+ final List<Thread> pendingThreads = task.getPendingThreads();
+ LOGGER.error("task {} was stuck. Stuck thread count = {}",
task.getTaskAttemptId(), pendingThreads.size());
+ for (Thread thread : pendingThreads) {
+ LOGGER.error("Stuck thread trace",
ExceptionUtils.fromThreadStack(thread));
+ }
+ });
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2525
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4ff41f86a11b52cae894fe40ffa0353f2fb52138
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: release-0.9.4-pre-rc
Gerrit-Owner: Murtadha Hubail <[email protected]>