Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2830

Change subject: [NO ISSUE][CLUS] Request NC Startup Tasks After CC Registration
......................................................................

[NO ISSUE][CLUS] Request NC Startup Tasks After CC Registration

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add new NodeJoinIntention message to notify CC of
  node intention to join. Upon receiving this message,
  CC requests NC to perform pre-registartion tasks
  (e.g. abort all previous jobs).
- Start web server on first node registration to ensure
  node doesn't send startup tasks request to CC before
  its web servers are up.

Change-Id: I7c58d006546f3ebca91333c2a4bc8ced68fdaf39
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
17 files changed, 208 insertions(+), 49 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/30/2830/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 019f54d..a4819e8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -83,6 +83,7 @@
     private INcApplicationContext runtimeContext;
     private String nodeId;
     private boolean stopInitiated;
+    private boolean firstRegistration = true;
     protected WebManager webManager;
 
     @Override
@@ -211,20 +212,9 @@
     }
 
     @Override
-    public void startupCompleted() throws Exception {
-        // configure servlets after joining the cluster, so we can create 
HyracksClientConnection
-        configureServers();
-        webManager.start();
-    }
-
-    @Override
-    public synchronized void onRegisterNode(CcId ccId) throws Exception {
-        final NodeControllerService ncs = (NodeControllerService) 
ncServiceCtx.getControllerService();
-        final NodeStatus currentStatus = ncs.getNodeStatus();
-        final SystemState systemState = isPendingStartupTasks(currentStatus, 
ncs.getPrimaryCcId(), ccId)
-                ? getCurrentSystemState() : SystemState.HEALTHY;
-        RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) 
ncServiceCtx.getControllerService(),
-                currentStatus, systemState);
+    public synchronized void nodeRegistered(CcId ccId) throws Exception {
+        onNodeRegistered();
+        requestStartupTasks(ccId);
     }
 
     @Override
@@ -296,4 +286,26 @@
     protected void configurePersistedResourceRegistry() {
         ncServiceCtx.setPersistedResourceRegistry(new 
PersistedResourceRegistry());
     }
+
+    protected synchronized void onFirstRegistration() throws Exception {
+        // configure servlets after joining the cluster, so we can create 
HyracksClientConnection
+        configureServers();
+        webManager.start();
+    }
+
+    protected synchronized void requestStartupTasks(CcId ccId) throws 
Exception {
+        final NodeControllerService ncs = (NodeControllerService) 
ncServiceCtx.getControllerService();
+        final NodeStatus currentStatus = ncs.getNodeStatus();
+        final SystemState systemState = isPendingStartupTasks(currentStatus, 
ncs.getPrimaryCcId(), ccId)
+                ? getCurrentSystemState() : SystemState.HEALTHY;
+        RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) 
ncServiceCtx.getControllerService(),
+                currentStatus, systemState);
+    }
+
+    private synchronized void onNodeRegistered() throws Exception {
+        if (firstRegistration) {
+            onFirstRegistration();
+        }
+        firstRegistration = false;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index af6cb92..7dc1527 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -34,5 +34,5 @@
      */
     IFileDeviceResolver getFileDeviceResolver();
 
-    void onRegisterNode(CcId ccId) throws Exception;
+    void nodeRegistered(CcId ccId) throws Exception;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 84cb4bd..aa0d8af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
 import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.NotifyNodeJoinIntentionWork;
 import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
 import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
 import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
@@ -161,6 +162,11 @@
                 ccs.getWorkQueue()
                         .schedule(new NotifyThreadDumpResponse(ccs, 
tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
                 break;
+            case NODE_JOIN_INTENTION:
+                CCNCFunctions.NodeJoinIntentionFunction njif = 
(CCNCFunctions.NodeJoinIntentionFunction) fn;
+                ccs.getWorkQueue().schedule(new 
NotifyNodeJoinIntentionWork(ccs, njif.getNodeId(),
+                        njif.getNodeAddress(), njif.getRegistrationId()));
+                break;
             default:
                 LOGGER.warn("Unknown function: " + fn.getFunctionId());
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 31f989b..971d401 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -104,11 +104,6 @@
             LOGGER.warn("Node with name " + nodeId + " has already registered; 
failing the node then re-registering.");
             failNode(nodeId);
         }
-        try {
-            ncState.getNodeController().abortJobs(ccs.getCcId());
-        } catch (IPCException e) {
-            throw HyracksDataException.create(e);
-        }
         LOGGER.warn("adding node to registry");
         nodeRegistry.put(nodeId, ncState);
         // Updates the IP address to node names map.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
new file mode 100644
index 0000000..f7fce4b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyNodeJoinIntentionWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NotifyNodeJoinIntentionWork extends SynchronizableWork {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final ClusterControllerService ccs;
+    private final String nodeId;
+    private final InetSocketAddress nodeAddress;
+    private final int registrationId;
+
+    public NotifyNodeJoinIntentionWork(ClusterControllerService ccs, String 
nodeId, InetSocketAddress nodeAddress,
+            int registrationId) {
+        this.ccs = ccs;
+        this.nodeId = nodeId;
+        this.nodeAddress = nodeAddress;
+        this.registrationId = registrationId;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        LOGGER.warn("Received intention to join the cluster from node {}", 
nodeId);
+        NodeControllerRemoteProxy nc =
+                new NodeControllerRemoteProxy(ccs.getCcId(), 
ccs.getClusterIPC().getReconnectingHandle(nodeAddress));
+        nc.abortJobs(ccs.getCcId(), registrationId);
+        LOGGER.warn("Instructed node {} to abort all existing jobs", nodeId);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fc0154e..360861b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.base;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -72,4 +73,6 @@
     void getNodeControllerInfos() throws Exception;
 
     void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception;
+
+    void notifyNodeJoinIntention(String nodeId, InetSocketAddress nodeAddress, 
int registrationId) throws Exception;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index fa835f4..bc55d04 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -72,9 +72,10 @@
      * started by cluster controller with id {@code ccId}
      *
      * @param ccId
+     * @param registrationId
      * @throws IPCException
      */
-    void abortJobs(CcId ccId) throws IPCException;
+    void abortJobs(CcId ccId, int registrationId) throws IPCException;
 
     /**
      * Sends node registration result to this {@link INodeController}.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index ce4578d..5f44b5d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -75,6 +76,7 @@
     private static final int FID_CODE_SIZE = 1;
 
     public enum FunctionId {
+        NODE_JOIN_INTENTION,
         REGISTER_NODE,
         UNREGISTER_NODE,
         NOTIFY_JOBLET_CLEANUP,
@@ -188,6 +190,37 @@
 
         public NodeRegistration getNodeRegistration() {
             return reg;
+        }
+
+        public int getRegistrationId() {
+            return registrationId;
+        }
+    }
+
+    public static class NodeJoinIntentionFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final InetSocketAddress nodeAddress;
+        private final String nodeId;
+        private final int registrationId;
+
+        public NodeJoinIntentionFunction(String nodeId, InetSocketAddress 
nodeAddress, int registrationId) {
+            this.nodeId = nodeId;
+            this.nodeAddress = nodeAddress;
+            this.registrationId = registrationId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_JOIN_INTENTION;
+        }
+
+        public InetSocketAddress getNodeAddress() {
+            return nodeAddress;
+        }
+
+        public String getNodeId() {
+            return nodeId;
         }
 
         public int getRegistrationId() {
@@ -691,9 +724,11 @@
     public static class AbortCCJobsFunction extends Function {
         private static final long serialVersionUID = 1L;
         private final CcId ccId;
+        private final int registrationId;
 
-        public AbortCCJobsFunction(CcId ccId) {
+        public AbortCCJobsFunction(CcId ccId, int registrationId) {
             this.ccId = ccId;
+            this.registrationId = registrationId;
         }
 
         @Override
@@ -704,6 +739,10 @@
         public CcId getCcId() {
             return ccId;
         }
+
+        public int getRegistrationId() {
+            return registrationId;
+        }
     }
 
     public static class DeployJobSpecFunction extends CCIdentifiedFunction {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 8e2ec22..1de4ee2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import static 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeJoinIntentionFunction;
+
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -63,6 +66,13 @@
     }
 
     @Override
+    public void notifyNodeJoinIntention(String nodeId, InetSocketAddress 
nodeAddress, int registrationId)
+            throws Exception {
+        NodeJoinIntentionFunction fn = new NodeJoinIntentionFunction(nodeId, 
nodeAddress, registrationId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void registerNode(NodeRegistration reg, int registrationId) throws 
Exception {
         RegisterNodeFunction fn = new RegisterNodeFunction(reg, 
registrationId);
         ipcHandle.send(-1, fn, null);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 8242bdc..30b0dd9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -139,8 +139,8 @@
     }
 
     @Override
-    public void abortJobs(CcId ccId) throws IPCException {
-        ipcHandle.send(-1, new CCNCFunctions.AbortCCJobsFunction(ccId), null);
+    public void abortJobs(CcId ccId, int registrationId) throws IPCException {
+        ipcHandle.send(-1, new CCNCFunctions.AbortCCJobsFunction(ccId, 
registrationId), null);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index ea16032..5c4fabf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -60,7 +60,7 @@
     }
 
     @Override
-    public void onRegisterNode(CcId ccId) throws Exception {
+    public void nodeRegistered(CcId ccId) throws Exception {
         // no-op
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 77623c2..f4ef61e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -38,6 +38,7 @@
     private final IClusterController ccs;
     private boolean registrationPending;
     private boolean registrationCompleted;
+    private boolean tasksCompleted;
     private Exception registrationException;
     private NodeParameters nodeParameters;
 
@@ -63,18 +64,9 @@
 
     public synchronized CcId registerNode(NodeRegistration nodeRegistration, 
int registrationId) throws Exception {
         registrationPending = true;
-        ccs.registerNode(nodeRegistration, registrationId);
-        try {
-            InvokeUtil.runWithTimeout(() -> {
-                this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while 
loop in timeout call
-            }, () -> !registrationPending, 1, TimeUnit.MINUTES);
-        } catch (Exception e) {
-            registrationException = e;
-        }
-        if (registrationException != null) {
-            LOGGER.fatal("Registering with {} failed with exception", this, 
registrationException);
-            ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
-        }
+        tasksCompleted = false;
+        notifyNodeJoinIntention(nodeRegistration, registrationId);
+        completeNodeRegistration(nodeRegistration, registrationId);
         return getCcId();
     }
 
@@ -108,4 +100,42 @@
         registrationCompleted = true;
         notifyAll();
     }
+
+    public synchronized void notifyTasksCompleted() {
+        tasksCompleted = true;
+        notifyAll();
+    }
+
+    private synchronized void notifyNodeJoinIntention(NodeRegistration 
nodeRegistration, int registrationId)
+            throws Exception {
+        ccs.notifyNodeJoinIntention(nodeRegistration.getNodeId(), 
nodeRegistration.getNodeControllerAddress(),
+                registrationId);
+        try {
+            InvokeUtil.runWithTimeout(() -> {
+                this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while 
loop in timeout call
+            }, () -> tasksCompleted, 2, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            registrationException = e;
+        }
+        if (registrationException != null) {
+            LOGGER.fatal("Registering join intention with {} failed with 
exception", this, registrationException);
+            ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
+        }
+    }
+
+    private synchronized void completeNodeRegistration(NodeRegistration 
nodeRegistration, int registrationId)
+            throws Exception {
+        ccs.registerNode(nodeRegistration, registrationId);
+        try {
+            InvokeUtil.runWithTimeout(() -> {
+                this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while 
loop in timeout call
+            }, () -> !registrationPending, 1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            registrationException = e;
+        }
+        if (registrationException != null) {
+            LOGGER.fatal("Registering with {} failed with exception", this, 
registrationException);
+            ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 08cd5d8..893716d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -73,7 +73,7 @@
                 return;
             case ABORT_ALL_JOBS:
                 CCNCFunctions.AbortCCJobsFunction aajf = 
(CCNCFunctions.AbortCCJobsFunction) fn;
-                ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs, 
aajf.getCcId()));
+                ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs, 
aajf.getCcId(), aajf.getRegistrationId()));
                 return;
             case CLEANUP_JOBLET:
                 CCNCFunctions.CleanupJobletFunction cjf = 
(CCNCFunctions.CleanupJobletFunction) fn;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 169e5ea..4112a13 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -294,10 +294,9 @@
             messagingNetManager.start();
         }
         initNodeControllerState();
+        workQueue.start();
         hbTask = new HeartbeatComputeTask(this);
         primaryCcId = addCc(new 
InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
-
-        workQueue.start();
 
         // Schedule heartbeat data updates
         timer.schedule(hbTask, HEARTBEAT_REFRESH_MILLIS, 
HEARTBEAT_REFRESH_MILLIS);
@@ -394,7 +393,7 @@
                 LOGGER.warn("ignoring exception trying to gracefully 
unregister cc {}: ", () -> ccId,
                         () -> String.valueOf(e));
             }
-            getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
+            workQueue.scheduleAndSync(new AbortAllJobsWork(this, ccId, -1));
             Thread hbThread = heartbeatThreads.remove(ccId);
             hbThread.interrupt();
             Timer ccTimer = ccTimers.remove(ccId);
@@ -436,6 +435,7 @@
             ccTimers.put(ccId, ccTimer);
         }
         ccc.notifyRegistrationCompleted();
+        application.nodeRegistered(ccId);
         LOGGER.info("Registering with Cluster Controller {} completed", ccc);
         return ccId;
     }
@@ -653,9 +653,14 @@
         return messagingNetManager;
     }
 
-    public void notifyTasksCompleted(CcId ccId) throws Exception {
-        partitionManager.jobsCompleted(ccId);
-        application.onRegisterNode(ccId);
+    public void notifyTasksCompleted(CcId ccId, int registrationId) {
+        if (pendingRegistrations.containsKey(registrationId)) {
+            partitionManager.jobsCompleted(ccId);
+            final CcConnection ccConnection = 
pendingRegistrations.get(registrationId);
+            ccConnection.notifyTasksCompleted();
+        } else {
+            LOGGER.warn("Ignoring tasks completed notification for CC {} with 
registration id", ccId, registrationId);
+        }
     }
 
     private static INCApplication getApplication(NCConfig config)
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index b11dada..d2c62e1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -39,10 +39,12 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final NodeControllerService ncs;
     private final CcId ccId;
+    private final int registrationId;
 
-    public AbortAllJobsWork(NodeControllerService ncs, CcId ccId) {
+    public AbortAllJobsWork(NodeControllerService ncs, CcId ccId, int 
registrationId) {
         this.ncs = ncs;
         this.ccId = ccId;
+        this.registrationId = registrationId;
     }
 
     @Override
@@ -67,6 +69,6 @@
             }
             ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, 
JobStatus.FAILURE));
         });
-        ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId, 
abortedTasks));
+        ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId, 
registrationId, abortedTasks));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 0f36c80..3235f9c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -39,11 +39,14 @@
     private final NodeControllerService ncs;
     private final CcId ccId;
     private final Deque<Task> runningTasks;
+    private final int registrationId;
 
-    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, 
Deque<Task> runningTasks) {
+    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, int 
registrationId,
+            Deque<Task> runningTasks) {
         this.ncs = ncs;
         this.ccId = ccId;
         this.runningTasks = runningTasks;
+        this.registrationId = registrationId;
     }
 
     @Override
@@ -61,7 +64,7 @@
             }
             if (runningTasks.isEmpty()) {
                 LOGGER.info("All tasks of CC {} have completed", ccId);
-                ncs.notifyTasksCompleted(ccId);
+                ncs.notifyTasksCompleted(ccId, registrationId);
             } else {
                 LOGGER.error("{} tasks associated with CC {} failed to 
complete after {}ms. Giving up",
                         runningTasks.size(), ccId, TIMEOUT);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 15248e7..c4170dd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -46,7 +46,7 @@
     }
 
     @Override
-    public void onRegisterNode(CcId ccs) throws Exception {
+    public void nodeRegistered(CcId ccs) throws Exception {
         // No-op
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2830
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7c58d006546f3ebca91333c2a4bc8ced68fdaf39
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to