Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2830
Change subject: [NO ISSUE][CLUS] Request NC Startup Tasks After CC Registration
......................................................................
[NO ISSUE][CLUS] Request NC Startup Tasks After CC Registration
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add new NodeJoinIntention message to notify CC of
node intention to join. Upon receiving this message,
CC requests NC to perform pre-registartion tasks
(e.g. abort all previous jobs).
- Start web server on first node registration to ensure
node doesn't send startup tasks request to CC before
its web servers are up.
Change-Id: I7c58d006546f3ebca91333c2a4bc8ced68fdaf39
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
17 files changed, 208 insertions(+), 49 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/30/2830/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 019f54d..a4819e8 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -83,6 +83,7 @@
private INcApplicationContext runtimeContext;
private String nodeId;
private boolean stopInitiated;
+ private boolean firstRegistration = true;
protected WebManager webManager;
@Override
@@ -211,20 +212,9 @@
}
@Override
- public void startupCompleted() throws Exception {
- // configure servlets after joining the cluster, so we can create
HyracksClientConnection
- configureServers();
- webManager.start();
- }
-
- @Override
- public synchronized void onRegisterNode(CcId ccId) throws Exception {
- final NodeControllerService ncs = (NodeControllerService)
ncServiceCtx.getControllerService();
- final NodeStatus currentStatus = ncs.getNodeStatus();
- final SystemState systemState = isPendingStartupTasks(currentStatus,
ncs.getPrimaryCcId(), ccId)
- ? getCurrentSystemState() : SystemState.HEALTHY;
- RegistrationTasksRequestMessage.send(ccId, (NodeControllerService)
ncServiceCtx.getControllerService(),
- currentStatus, systemState);
+ public synchronized void nodeRegistered(CcId ccId) throws Exception {
+ onNodeRegistered();
+ requestStartupTasks(ccId);
}
@Override
@@ -296,4 +286,26 @@
protected void configurePersistedResourceRegistry() {
ncServiceCtx.setPersistedResourceRegistry(new
PersistedResourceRegistry());
}
+
+ protected synchronized void onFirstRegistration() throws Exception {
+ // configure servlets after joining the cluster, so we can create
HyracksClientConnection
+ configureServers();
+ webManager.start();
+ }
+
+ protected synchronized void requestStartupTasks(CcId ccId) throws
Exception {
+ final NodeControllerService ncs = (NodeControllerService)
ncServiceCtx.getControllerService();
+ final NodeStatus currentStatus = ncs.getNodeStatus();
+ final SystemState systemState = isPendingStartupTasks(currentStatus,
ncs.getPrimaryCcId(), ccId)
+ ? getCurrentSystemState() : SystemState.HEALTHY;
+ RegistrationTasksRequestMessage.send(ccId, (NodeControllerService)
ncServiceCtx.getControllerService(),
+ currentStatus, systemState);
+ }
+
+ private synchronized void onNodeRegistered() throws Exception {
+ if (firstRegistration) {
+ onFirstRegistration();
+ }
+ firstRegistration = false;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index af6cb92..7dc1527 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -34,5 +34,5 @@
*/
IFileDeviceResolver getFileDeviceResolver();
- void onRegisterNode(CcId ccId) throws Exception;
+ void nodeRegistered(CcId ccId) throws Exception;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 84cb4bd..aa0d8af 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.NotifyNodeJoinIntentionWork;
import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
@@ -161,6 +162,11 @@
ccs.getWorkQueue()
.schedule(new NotifyThreadDumpResponse(ccs,
tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
break;
+ case NODE_JOIN_INTENTION:
+ CCNCFunctions.NodeJoinIntentionFunction njif =
(CCNCFunctions.NodeJoinIntentionFunction) fn;
+ ccs.getWorkQueue().schedule(new
NotifyNodeJoinIntentionWork(ccs, njif.getNodeId(),
+ njif.getNodeAddress(), njif.getRegistrationId()));
+ break;
default:
LOGGER.warn("Unknown function: " + fn.getFunctionId());
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 31f989b..971d401 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -104,11 +104,6 @@
LOGGER.warn("Node with name " + nodeId + " has already registered;
failing the node then re-registering.");
failNode(nodeId);
}
- try {
- ncState.getNodeController().abortJobs(ccs.getCcId());
- } catch (IPCException e) {
- throw HyracksDataException.create(e);
- }
LOGGER.warn("adding node to registry");
nodeRegistry.put(nodeId, ncState);
// Updates the IP address to node names map.
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
new file mode 100644
index 0000000..f7fce4b
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NotifyNodeJoinIntentionWork extends SynchronizableWork {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final ClusterControllerService ccs;
+ private final String nodeId;
+ private final InetSocketAddress nodeAddress;
+ private final int registrationId;
+
+ public NotifyNodeJoinIntentionWork(ClusterControllerService ccs, String
nodeId, InetSocketAddress nodeAddress,
+ int registrationId) {
+ this.ccs = ccs;
+ this.nodeId = nodeId;
+ this.nodeAddress = nodeAddress;
+ this.registrationId = registrationId;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ LOGGER.warn("Received intention to join the cluster from node {}",
nodeId);
+ NodeControllerRemoteProxy nc =
+ new NodeControllerRemoteProxy(ccs.getCcId(),
ccs.getClusterIPC().getReconnectingHandle(nodeAddress));
+ nc.abortJobs(ccs.getCcId(), registrationId);
+ LOGGER.warn("Instructed node {} to abort all existing jobs", nodeId);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fc0154e..360861b 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.base;
+import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -72,4 +73,6 @@
void getNodeControllerInfos() throws Exception;
void notifyThreadDump(String nodeId, String requestId, String
threadDumpJSON) throws Exception;
+
+ void notifyNodeJoinIntention(String nodeId, InetSocketAddress nodeAddress,
int registrationId) throws Exception;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index fa835f4..bc55d04 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -72,9 +72,10 @@
* started by cluster controller with id {@code ccId}
*
* @param ccId
+ * @param registrationId
* @throws IPCException
*/
- void abortJobs(CcId ccId) throws IPCException;
+ void abortJobs(CcId ccId, int registrationId) throws IPCException;
/**
* Sends node registration result to this {@link INodeController}.
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index ce4578d..5f44b5d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
+import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -75,6 +76,7 @@
private static final int FID_CODE_SIZE = 1;
public enum FunctionId {
+ NODE_JOIN_INTENTION,
REGISTER_NODE,
UNREGISTER_NODE,
NOTIFY_JOBLET_CLEANUP,
@@ -188,6 +190,37 @@
public NodeRegistration getNodeRegistration() {
return reg;
+ }
+
+ public int getRegistrationId() {
+ return registrationId;
+ }
+ }
+
+ public static class NodeJoinIntentionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final InetSocketAddress nodeAddress;
+ private final String nodeId;
+ private final int registrationId;
+
+ public NodeJoinIntentionFunction(String nodeId, InetSocketAddress
nodeAddress, int registrationId) {
+ this.nodeId = nodeId;
+ this.nodeAddress = nodeAddress;
+ this.registrationId = registrationId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_JOIN_INTENTION;
+ }
+
+ public InetSocketAddress getNodeAddress() {
+ return nodeAddress;
+ }
+
+ public String getNodeId() {
+ return nodeId;
}
public int getRegistrationId() {
@@ -691,9 +724,11 @@
public static class AbortCCJobsFunction extends Function {
private static final long serialVersionUID = 1L;
private final CcId ccId;
+ private final int registrationId;
- public AbortCCJobsFunction(CcId ccId) {
+ public AbortCCJobsFunction(CcId ccId, int registrationId) {
this.ccId = ccId;
+ this.registrationId = registrationId;
}
@Override
@@ -704,6 +739,10 @@
public CcId getCcId() {
return ccId;
}
+
+ public int getRegistrationId() {
+ return registrationId;
+ }
}
public static class DeployJobSpecFunction extends CCIdentifiedFunction {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 8e2ec22..1de4ee2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,9 @@
*/
package org.apache.hyracks.control.common.ipc;
+import static
org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeJoinIntentionFunction;
+
+import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -63,6 +66,13 @@
}
@Override
+ public void notifyNodeJoinIntention(String nodeId, InetSocketAddress
nodeAddress, int registrationId)
+ throws Exception {
+ NodeJoinIntentionFunction fn = new NodeJoinIntentionFunction(nodeId,
nodeAddress, registrationId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void registerNode(NodeRegistration reg, int registrationId) throws
Exception {
RegisterNodeFunction fn = new RegisterNodeFunction(reg,
registrationId);
ipcHandle.send(-1, fn, null);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 8242bdc..30b0dd9 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -139,8 +139,8 @@
}
@Override
- public void abortJobs(CcId ccId) throws IPCException {
- ipcHandle.send(-1, new CCNCFunctions.AbortCCJobsFunction(ccId), null);
+ public void abortJobs(CcId ccId, int registrationId) throws IPCException {
+ ipcHandle.send(-1, new CCNCFunctions.AbortCCJobsFunction(ccId,
registrationId), null);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index ea16032..5c4fabf 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -60,7 +60,7 @@
}
@Override
- public void onRegisterNode(CcId ccId) throws Exception {
+ public void nodeRegistered(CcId ccId) throws Exception {
// no-op
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 77623c2..f4ef61e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -38,6 +38,7 @@
private final IClusterController ccs;
private boolean registrationPending;
private boolean registrationCompleted;
+ private boolean tasksCompleted;
private Exception registrationException;
private NodeParameters nodeParameters;
@@ -63,18 +64,9 @@
public synchronized CcId registerNode(NodeRegistration nodeRegistration,
int registrationId) throws Exception {
registrationPending = true;
- ccs.registerNode(nodeRegistration, registrationId);
- try {
- InvokeUtil.runWithTimeout(() -> {
- this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while
loop in timeout call
- }, () -> !registrationPending, 1, TimeUnit.MINUTES);
- } catch (Exception e) {
- registrationException = e;
- }
- if (registrationException != null) {
- LOGGER.fatal("Registering with {} failed with exception", this,
registrationException);
- ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
- }
+ tasksCompleted = false;
+ notifyNodeJoinIntention(nodeRegistration, registrationId);
+ completeNodeRegistration(nodeRegistration, registrationId);
return getCcId();
}
@@ -108,4 +100,42 @@
registrationCompleted = true;
notifyAll();
}
+
+ public synchronized void notifyTasksCompleted() {
+ tasksCompleted = true;
+ notifyAll();
+ }
+
+ private synchronized void notifyNodeJoinIntention(NodeRegistration
nodeRegistration, int registrationId)
+ throws Exception {
+ ccs.notifyNodeJoinIntention(nodeRegistration.getNodeId(),
nodeRegistration.getNodeControllerAddress(),
+ registrationId);
+ try {
+ InvokeUtil.runWithTimeout(() -> {
+ this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while
loop in timeout call
+ }, () -> tasksCompleted, 2, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ registrationException = e;
+ }
+ if (registrationException != null) {
+ LOGGER.fatal("Registering join intention with {} failed with
exception", this, registrationException);
+ ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
+ }
+ }
+
+ private synchronized void completeNodeRegistration(NodeRegistration
nodeRegistration, int registrationId)
+ throws Exception {
+ ccs.registerNode(nodeRegistration, registrationId);
+ try {
+ InvokeUtil.runWithTimeout(() -> {
+ this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while
loop in timeout call
+ }, () -> !registrationPending, 1, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ registrationException = e;
+ }
+ if (registrationException != null) {
+ LOGGER.fatal("Registering with {} failed with exception", this,
registrationException);
+ ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
+ }
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 08cd5d8..893716d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -73,7 +73,7 @@
return;
case ABORT_ALL_JOBS:
CCNCFunctions.AbortCCJobsFunction aajf =
(CCNCFunctions.AbortCCJobsFunction) fn;
- ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs,
aajf.getCcId()));
+ ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs,
aajf.getCcId(), aajf.getRegistrationId()));
return;
case CLEANUP_JOBLET:
CCNCFunctions.CleanupJobletFunction cjf =
(CCNCFunctions.CleanupJobletFunction) fn;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 169e5ea..4112a13 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -294,10 +294,9 @@
messagingNetManager.start();
}
initNodeControllerState();
+ workQueue.start();
hbTask = new HeartbeatComputeTask(this);
primaryCcId = addCc(new
InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
-
- workQueue.start();
// Schedule heartbeat data updates
timer.schedule(hbTask, HEARTBEAT_REFRESH_MILLIS,
HEARTBEAT_REFRESH_MILLIS);
@@ -394,7 +393,7 @@
LOGGER.warn("ignoring exception trying to gracefully
unregister cc {}: ", () -> ccId,
() -> String.valueOf(e));
}
- getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
+ workQueue.scheduleAndSync(new AbortAllJobsWork(this, ccId, -1));
Thread hbThread = heartbeatThreads.remove(ccId);
hbThread.interrupt();
Timer ccTimer = ccTimers.remove(ccId);
@@ -436,6 +435,7 @@
ccTimers.put(ccId, ccTimer);
}
ccc.notifyRegistrationCompleted();
+ application.nodeRegistered(ccId);
LOGGER.info("Registering with Cluster Controller {} completed", ccc);
return ccId;
}
@@ -653,9 +653,14 @@
return messagingNetManager;
}
- public void notifyTasksCompleted(CcId ccId) throws Exception {
- partitionManager.jobsCompleted(ccId);
- application.onRegisterNode(ccId);
+ public void notifyTasksCompleted(CcId ccId, int registrationId) {
+ if (pendingRegistrations.containsKey(registrationId)) {
+ partitionManager.jobsCompleted(ccId);
+ final CcConnection ccConnection =
pendingRegistrations.get(registrationId);
+ ccConnection.notifyTasksCompleted();
+ } else {
+ LOGGER.warn("Ignoring tasks completed notification for CC {} with
registration id", ccId, registrationId);
+ }
}
private static INCApplication getApplication(NCConfig config)
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index b11dada..d2c62e1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -39,10 +39,12 @@
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
private final CcId ccId;
+ private final int registrationId;
- public AbortAllJobsWork(NodeControllerService ncs, CcId ccId) {
+ public AbortAllJobsWork(NodeControllerService ncs, CcId ccId, int
registrationId) {
this.ncs = ncs;
this.ccId = ccId;
+ this.registrationId = registrationId;
}
@Override
@@ -67,6 +69,6 @@
}
ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId,
JobStatus.FAILURE));
});
- ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId,
abortedTasks));
+ ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId,
registrationId, abortedTasks));
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 0f36c80..3235f9c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -39,11 +39,14 @@
private final NodeControllerService ncs;
private final CcId ccId;
private final Deque<Task> runningTasks;
+ private final int registrationId;
- public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId,
Deque<Task> runningTasks) {
+ public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, int
registrationId,
+ Deque<Task> runningTasks) {
this.ncs = ncs;
this.ccId = ccId;
this.runningTasks = runningTasks;
+ this.registrationId = registrationId;
}
@Override
@@ -61,7 +64,7 @@
}
if (runningTasks.isEmpty()) {
LOGGER.info("All tasks of CC {} have completed", ccId);
- ncs.notifyTasksCompleted(ccId);
+ ncs.notifyTasksCompleted(ccId, registrationId);
} else {
LOGGER.error("{} tasks associated with CC {} failed to
complete after {}ms. Giving up",
runningTasks.size(), ccId, TIMEOUT);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 15248e7..c4170dd 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -46,7 +46,7 @@
}
@Override
- public void onRegisterNode(CcId ccs) throws Exception {
+ public void nodeRegistered(CcId ccs) throws Exception {
// No-op
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2830
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7c58d006546f3ebca91333c2a4bc8ced68fdaf39
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>