Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2439

Change subject: [NO ISSUE] Decouple heartbeat data calculation from transmission
......................................................................

[NO ISSUE] Decouple heartbeat data calculation from transmission

- also, generate native JSON objects instead of POJOs for node config & stats
- fix illegal regex on Windows for node config

Change-Id: Iccb02350b56328ba1adbca97a1cb5efeb9d9ad14
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerHandle.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
R 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerState.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
33 files changed, 391 insertions(+), 260 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/39/2439/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index 0f08a22..992b85b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -91,7 +91,9 @@
             ncDataMap.put(nc, getNcDiagnosticFutures(nc));
         }
         ObjectNode result = OBJECT_MAPPER.createObjectNode();
-        result.putPOJO("cc", resolveFutures(ccFutureData));
+        if (!ccFutureData.isEmpty()) {
+            result.putPOJO("cc", resolveFutures(ccFutureData));
+        }
         List<Map<String, ?>> ncList = new ArrayList<>();
         for (Map.Entry<String, Map<String, Future<JsonNode>>> entry : 
ncDataMap.entrySet()) {
             final Map<String, JsonNode> ncMap = 
resolveFutures(entry.getValue());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index 8ca0947..8cb8cb6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -41,7 +41,6 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class NodeControllerDetailsApiServlet extends ClusterApiServlet {
@@ -142,7 +141,10 @@
 
     protected ObjectNode processNodeStats(IHyracksClientConnection hcc, String 
node) throws Exception {
         final String details = checkNullDetail(node, 
hcc.getNodeDetailsJSON(node, true, false));
-        ObjectNode json = (ObjectNode) OBJECT_MAPPER.readTree(details);
+        return processNodeDetailsJSON((ObjectNode) 
OBJECT_MAPPER.readTree(details));
+    }
+
+    protected ObjectNode processNodeDetailsJSON(ObjectNode json) {
         int index = json.get("rrd-ptr").asInt() - 1;
         json.remove("rrd-ptr");
 
@@ -150,7 +152,6 @@
         for (Iterator<String> iter = json.fieldNames(); iter.hasNext();) {
             keys.add(iter.next());
         }
-
         final ArrayNode gcNames = (ArrayNode) json.get("gc-names");
         final ArrayNode gcCollectionTimes = (ArrayNode) 
json.get("gc-collection-times");
         final ArrayNode gcCollectionCounts = (ArrayNode) 
json.get("gc-collection-counts");
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 5f42d7a..f23c117 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -38,7 +38,7 @@
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -69,7 +69,7 @@
     @Override
     public void sendApplicationMessageToNC(INcAddressedMessage msg, String 
nodeId) throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
-        NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
+        NodeControllerHandle state = 
nodeManager.getNodeControllerState(nodeId);
         if (msg instanceof ICcIdentifiedMessage) {
             ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 06c92dd..84cb4bd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -65,7 +65,8 @@
         switch (fn.getFunctionId()) {
             case REGISTER_NODE:
                 CCNCFunctions.RegisterNodeFunction rnf = 
(CCNCFunctions.RegisterNodeFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs, 
rnf.getNodeRegistration()));
+                ccs.getWorkQueue()
+                        .schedule(new RegisterNodeWork(ccs, 
rnf.getNodeRegistration(), rnf.getRegistrationId()));
                 break;
             case UNREGISTER_NODE:
                 CCNCFunctions.UnregisterNodeFunction unf = 
(CCNCFunctions.UnregisterNodeFunction) fn;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerHandle.java
new file mode 100644
index 0000000..eb78e6b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerHandle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import org.apache.hyracks.control.common.NodeControllerState;
+import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+
+public class NodeControllerHandle extends NodeControllerState {
+
+    private final NodeControllerRemoteProxy nodeController;
+
+    public NodeControllerHandle(NodeControllerRemoteProxy nodeController, 
NodeRegistration reg) {
+        super(reg);
+        this.nodeController = nodeController;
+    }
+
+    public NodeControllerRemoteProxy getNodeController() {
+        return nodeController;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
index eb54cc3..52d9ed3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/INodeManager.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.util.annotations.Idempotent;
 
 /**
@@ -41,7 +41,7 @@
      */
     @FunctionalInterface
     interface NodeFunction {
-        void apply(String nodeId, NodeControllerState ncState);
+        void apply(String nodeId, NodeControllerHandle ncState);
     }
 
     /**
@@ -60,7 +60,7 @@
     /**
      * @return all node controller states.
      */
-    Collection<NodeControllerState> getAllNodeControllerStates();
+    Collection<NodeControllerHandle> getAllNodeControllerStates();
 
     /**
      * @return the map that maps a IP addresses to a set of node names.
@@ -88,7 +88,7 @@
      *            a given node id.
      * @return the corresponding node controller state.
      */
-    NodeControllerState getNodeControllerState(String nodeId);
+    NodeControllerHandle getNodeControllerState(String nodeId);
 
     /**
      * Adds one node into the cluster.
@@ -100,7 +100,7 @@
      * @throws HyracksException
      *             when the node has already been added or the IP address 
given in the node state is not valid.
      */
-    void addNode(String nodeId, NodeControllerState ncState) throws 
HyracksException;
+    void addNode(String nodeId, NodeControllerHandle ncState) throws 
HyracksException;
 
     /**
      * Removes one node from the cluster.  This method is idempotent.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8246cad..1c002b7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -40,7 +40,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
@@ -60,7 +60,7 @@
     private final ClusterControllerService ccs;
     private final CCConfig ccConfig;
     private final IResourceManager resourceManager;
-    private final Map<String, NodeControllerState> nodeRegistry;
+    private final Map<String, NodeControllerHandle> nodeRegistry;
     private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
     private final int nodeCoresMultiplier;
 
@@ -84,17 +84,17 @@
     }
 
     @Override
-    public Collection<NodeControllerState> getAllNodeControllerStates() {
+    public Collection<NodeControllerHandle> getAllNodeControllerStates() {
         return Collections.unmodifiableCollection(nodeRegistry.values());
     }
 
     @Override
-    public NodeControllerState getNodeControllerState(String nodeId) {
+    public NodeControllerHandle getNodeControllerState(String nodeId) {
         return nodeRegistry.get(nodeId);
     }
 
     @Override
-    public void addNode(String nodeId, NodeControllerState ncState) throws 
HyracksException {
+    public void addNode(String nodeId, NodeControllerHandle ncState) throws 
HyracksException {
         LOGGER.warn("addNode(" + nodeId + ") called");
         if (nodeId == null || ncState == null) {
             throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
@@ -132,7 +132,7 @@
     @Override
     @Idempotent
     public void removeNode(String nodeId) throws HyracksException {
-        NodeControllerState ncState = nodeRegistry.remove(nodeId);
+        NodeControllerHandle ncState = nodeRegistry.remove(nodeId);
         if (ncState == null) {
             LOGGER.warn("request to remove unknown node {}; ignoring", nodeId);
         } else {
@@ -155,13 +155,13 @@
     public Pair<Collection<String>, Collection<JobId>> removeDeadNodes() 
throws HyracksException {
         Set<String> deadNodes = new HashSet<>();
         Set<JobId> affectedJobIds = new HashSet<>();
-        Iterator<Map.Entry<String, NodeControllerState>> nodeIterator = 
nodeRegistry.entrySet().iterator();
+        Iterator<Map.Entry<String, NodeControllerHandle>> nodeIterator = 
nodeRegistry.entrySet().iterator();
         long deadNodeNanosThreshold =
                 TimeUnit.MILLISECONDS.toNanos(ccConfig.getHeartbeatMaxMisses() 
* ccConfig.getHeartbeatPeriodMillis());
         while (nodeIterator.hasNext()) {
-            Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
+            Map.Entry<String, NodeControllerHandle> entry = 
nodeIterator.next();
             String nodeId = entry.getKey();
-            NodeControllerState state = entry.getValue();
+            NodeControllerHandle state = entry.getValue();
             final long nanosSinceLastHeartbeat = 
state.nanosSinceLastHeartbeat();
             if (nanosSinceLastHeartbeat >= deadNodeNanosThreshold) {
                 ensureNodeFailure(nodeId, state);
@@ -179,7 +179,7 @@
     }
 
     public void failNode(String nodeId) throws HyracksException {
-        NodeControllerState state = nodeRegistry.get(nodeId);
+        NodeControllerHandle state = nodeRegistry.get(nodeId);
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
         nodeRegistry.remove(nodeId);
@@ -204,7 +204,7 @@
         nodeRegistry.forEach(nodeFunction::apply);
     }
 
-    private void removeNodeFromIpAddressMap(String nodeId, NodeControllerState 
ncState) throws HyracksException {
+    private void removeNodeFromIpAddressMap(String nodeId, 
NodeControllerHandle ncState) throws HyracksException {
         InetAddress ipAddress = getIpAddress(ncState);
         Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
         if (nodes != null) {
@@ -216,7 +216,7 @@
         }
     }
 
-    private InetAddress getIpAddress(NodeControllerState ncState) throws 
HyracksException {
+    private InetAddress getIpAddress(NodeControllerHandle ncState) throws 
HyracksException {
         String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
             return InetAddress.getByName(ipAddress);
@@ -229,7 +229,7 @@
         return new NodeCapacity(nodeCapacity.getMemoryByteSize(), 
nodeCapacity.getCores() * nodeCoresMultiplier);
     }
 
-    private void ensureNodeFailure(String nodeId, NodeControllerState state) {
+    private void ensureNodeFailure(String nodeId, NodeControllerHandle state) {
         try {
             LOGGER.info("Requesting node {} to shutdown to ensure failure", 
nodeId);
             state.getNodeController().shutdown(false);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index ac06344..c720675 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -51,7 +51,7 @@
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
 import org.apache.hyracks.control.cc.job.IJobManager;
@@ -508,7 +508,7 @@
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : 
taskAttemptMap.entrySet()) {
                 String nodeId = entry.getKey();
                 final List<TaskAttemptDescriptor> taskDescriptors = 
entry.getValue();
-                final NodeControllerState node = 
nodeManager.getNodeControllerState(nodeId);
+                final NodeControllerHandle node = 
nodeManager.getNodeControllerState(nodeId);
                 if (node != null) {
                     node.getActiveJobIds().add(jobRun.getJobId());
                     boolean changed = 
jobRun.getParticipatingNodeIds().add(nodeId);
@@ -563,7 +563,7 @@
         LOGGER.info("Abort map for job: " + jobId + ": " + 
abortTaskAttemptMap);
         INodeManager nodeManager = ccs.getNodeManager();
         abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
-            final NodeControllerState node = 
nodeManager.getNodeControllerState(key);
+            final NodeControllerHandle node = 
nodeManager.getNodeControllerState(key);
             if (node != null) {
                 if (LOGGER.isInfoEnabled()) {
                     LOGGER.info("Aborting: " + abortTaskAttempts + " at " + 
key);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index b728e73..64815ac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -38,7 +38,7 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
@@ -172,7 +172,7 @@
             INodeManager nodeManager = ccs.getNodeManager();
             Set<String> toDelete = new HashSet<>();
             for (String n : targetNodes) {
-                NodeControllerState ncs = 
nodeManager.getNodeControllerState(n);
+                NodeControllerHandle ncs = 
nodeManager.getNodeControllerState(n);
                 try {
                     if (ncs == null) {
                         toDelete.add(n);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
index 65851ef..56e0c65 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java
@@ -23,7 +23,7 @@
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
@@ -36,8 +36,8 @@
         PartitionRequest req = match.getRight();
 
         INodeManager nodeManager = ccs.getNodeManager();
-        NodeControllerState producerNCS = 
nodeManager.getNodeControllerState(desc.getNodeId());
-        NodeControllerState requestorNCS = 
nodeManager.getNodeControllerState(req.getNodeId());
+        NodeControllerHandle producerNCS = 
nodeManager.getNodeControllerState(desc.getNodeId());
+        NodeControllerHandle requestorNCS = 
nodeManager.getNodeControllerState(req.getNodeId());
         final NetworkAddress dataport = producerNCS.getDataPort();
         final INodeController requestorNC = requestorNCS.getNodeController();
         requestorNC.reportPartitionAvailability(pid, dataport);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
index 8e7faff..88b5f34 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -20,7 +20,7 @@
 package org.apache.hyracks.control.cc.work;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -40,7 +40,7 @@
     @Override
     public void doRun() {
         INodeManager nodeManager = ccs.getNodeManager();
-        NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
+        NodeControllerHandle state = 
nodeManager.getNodeControllerState(nodeId);
         if (state != null) {
             if (hbData != null) {
                 state.notifyHeartbeat(hbData);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
index 4962607..a5a68fe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -26,7 +26,7 @@
 
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
@@ -81,7 +81,7 @@
             /***
              * deploy binaries to each node controller
              */
-            for (NodeControllerState ncs : 
nodeManager.getAllNodeControllerStates()) {
+            for (NodeControllerHandle ncs : 
nodeManager.getAllNodeControllerStates()) {
                 ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
             }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index 638c27d..54c5ed5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -24,7 +24,7 @@
 
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
@@ -77,7 +77,7 @@
             /***
              * deploy binaries to each node controller
              */
-            for (NodeControllerState ncs : 
nodeManager.getAllNodeControllerStates()) {
+            for (NodeControllerHandle ncs : 
nodeManager.getAllNodeControllerStates()) {
                 ncs.getNodeController().undeployBinary(deploymentId);
             }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index a7c3c2f..c6ff95f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -22,7 +22,7 @@
 import java.util.Collection;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.shutdown.ShutdownRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -92,7 +92,7 @@
         }
     }
 
-    protected void shutdownNode(String nodeId, NodeControllerState ncState) {
+    protected void shutdownNode(String nodeId, NodeControllerHandle ncState) {
         try {
             LOGGER.info("Notifying NC " + nodeId + " to shutdown...");
             ncState.getNodeController().shutdown(terminateNCService);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
index c51f3c5..0789d42 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
@@ -64,7 +64,7 @@
             byte[] acgBytes = JavaSerializationUtils.serialize(acg);
 
             INodeManager nodeManager = ccs.getNodeManager();
-            for (NodeControllerState node : 
nodeManager.getAllNodeControllerStates()) {
+            for (NodeControllerHandle node : 
nodeManager.getAllNodeControllerStates()) {
                 node.getNodeController().deployJobSpec(deployedJobSpecId, 
acgBytes);
             }
             callback.setValue(deployedJobSpecId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
index d827eba..fd5f8bf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
@@ -20,14 +20,13 @@
 package org.apache.hyracks.control.cc.work;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
@@ -48,7 +47,7 @@
         Collection<String> nodeIds = new HashSet<>();
         nodeIds.addAll(nodeManager.getAllNodeIds());
         sdr.setNCs(nodeIds);
-        for (NodeControllerState ncs : 
nodeManager.getAllNodeControllerStates()) {
+        for (NodeControllerHandle ncs : 
nodeManager.getAllNodeControllerStates()) {
             ncs.getNodeController().dumpState(sdr.stateDumpId);
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 517f56f..aec03a9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -34,7 +34,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.config.Section;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -81,7 +81,7 @@
                 detail.putPOJO("app.args", ccConfig.getAppArgs());
             }
         } else {
-            NodeControllerState ncs = 
nodeManager.getNodeControllerState(nodeId);
+            NodeControllerHandle ncs = 
nodeManager.getNodeControllerState(nodeId);
             if (ncs != null) {
                 detail = ncs.toDetailedJSON(includeStats, includeConfig);
                 if (includeConfig) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
index b78e817..334dfe3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeSummariesJSONWork.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
@@ -37,7 +37,7 @@
     protected void doRun() throws Exception {
         ObjectMapper om = new ObjectMapper();
         summaries = om.createArrayNode();
-        for (NodeControllerState ncs : 
nodeManager.getAllNodeControllerStates()) {
+        for (NodeControllerHandle ncs : 
nodeManager.getAllNodeControllerStates()) {
             summaries.add(ncs.toSummaryJSON());
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index b064e52..f364cb2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -61,7 +61,7 @@
             }
         } else {
             INodeManager nodeManager = ccs.getNodeManager();
-            final NodeControllerState ncState = 
nodeManager.getNodeControllerState(nodeId);
+            final NodeControllerHandle ncState = 
nodeManager.getNodeControllerState(nodeId);
             if (ncState == null) {
                 // bad node id, reply with null immediately
                 callback.setValue(null);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index cc37f9c..d0bdc41 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
@@ -63,7 +63,7 @@
             return;
         }
         INodeManager nodeManager = ccs.getNodeManager();
-        NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
+        NodeControllerHandle ncs = nodeManager.getNodeControllerState(nodeId);
         if (ncs != null) {
             ncs.getActiveJobIds().remove(jobId);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 96f5f1b..1697d46 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
@@ -41,10 +41,12 @@
 
     private final ClusterControllerService ccs;
     private final NodeRegistration reg;
+    private final int registrationId;
 
-    public RegisterNodeWork(ClusterControllerService ccs, NodeRegistration 
reg) {
+    public RegisterNodeWork(ClusterControllerService ccs, NodeRegistration 
reg, int registrationId) {
         this.ccs = ccs;
         this.reg = reg;
+        this.registrationId = registrationId;
     }
 
     @Override
@@ -58,7 +60,7 @@
             LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
             NodeControllerRemoteProxy nc = new 
NodeControllerRemoteProxy(ccs.getCcId(),
                     
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
-            NodeControllerState state = new NodeControllerState(nc, reg);
+            NodeControllerHandle state = new NodeControllerHandle(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
             IApplicationConfig cfg = 
state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
@@ -71,7 +73,7 @@
             params.setDistributedState(ccs.getContext().getDistributedState());
             
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
             
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
-            params.setRegistrationId(reg.getRegistrationId());
+            params.setRegistrationId(registrationId);
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Node registration failed", e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
index 143c8c1..58b5462 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
@@ -20,7 +20,7 @@
 
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -42,7 +42,7 @@
         try {
             
ccs.getDeployedJobSpecStore().removeDeployedJobSpecDescriptor(deployedJobSpecId);
             INodeManager nodeManager = ccs.getNodeManager();
-            for (NodeControllerState node : 
nodeManager.getAllNodeControllerStates()) {
+            for (NodeControllerHandle node : 
nodeManager.getAllNodeControllerStates()) {
                 node.getNodeController().undeployJobSpec(deployedJobSpecId);
             }
             callback.setValue(deployedJobSpecId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 931e436..58cb6d0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.cc.scheduler.ResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -55,8 +55,8 @@
         final int coresMultiplier = 1;
         ccConfig.setCoresMultiplier(coresMultiplier);
         INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, 
resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
-        NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
+        NodeControllerHandle ncState1 = mockNodeControllerState(NODE1, false);
+        NodeControllerHandle ncState2 = mockNodeControllerState(NODE2, false);
 
         // Verifies states after adding nodes.
         nodeManager.addNode(NODE1, ncState1);
@@ -83,8 +83,8 @@
         final int coresMultiplier = 3;
         ccConfig.setCoresMultiplier(coresMultiplier);
         INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, 
resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
-        NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
+        NodeControllerHandle ncState1 = mockNodeControllerState(NODE1, false);
+        NodeControllerHandle ncState2 = mockNodeControllerState(NODE2, false);
 
         // verify state after adding two nodes
         nodeManager.addNode(NODE1, ncState1);
@@ -112,7 +112,7 @@
     public void testException() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), 
resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
+        NodeControllerHandle ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
         // Verifies states after a failure during adding nodes.
@@ -161,8 +161,8 @@
         return ccConfig;
     }
 
-    private NodeControllerState mockNodeControllerState(String nodeId, boolean 
invalidIpAddr) {
-        NodeControllerState ncState = mock(NodeControllerState.class);
+    private NodeControllerHandle mockNodeControllerState(String nodeId, 
boolean invalidIpAddr) {
+        NodeControllerHandle ncState = mock(NodeControllerHandle.class);
         NodeControllerRemoteProxy ncProxy = 
Mockito.mock(NodeControllerRemoteProxy.class);
         String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
         NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 251aed8..85c544e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -41,7 +41,7 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.NodeControllerHandle;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.cluster.NodeManager;
@@ -292,7 +292,7 @@
 
     private INodeManager mockNodeManager() {
         INodeManager nodeManager = mock(NodeManager.class);
-        NodeControllerState ncState = mock(NodeControllerState.class);
+        NodeControllerHandle ncState = mock(NodeControllerHandle.class);
         NodeControllerRemoteProxy nodeController = 
mock(NodeControllerRemoteProxy.class);
         when(nodeManager.getNodeControllerState(any())).thenReturn(ncState);
         when(ncState.getNodeController()).thenReturn(nodeController);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index bdb2aed..e13e23b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -79,5 +79,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerState.java
similarity index 71%
rename from 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
rename to 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerState.java
index 415ca81..a429d0a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerState.java
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.cc;
+package org.apache.hyracks.control.common;
+
+import static org.apache.hyracks.util.JSONUtil.put;
 
 import java.io.File;
 import java.util.Date;
@@ -25,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -32,16 +35,13 @@
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
-import 
org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
-import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class NodeControllerState {
-    private static final int RRD_SIZE = 720;
 
-    private final NodeControllerRemoteProxy nodeController;
+    private static final int RRD_SIZE = 720;
 
     private final NCConfig ncConfig;
 
@@ -145,8 +145,7 @@
 
     private NodeCapacity capacity;
 
-    public NodeControllerState(NodeControllerRemoteProxy nodeController, 
NodeRegistration reg) {
-        this.nodeController = nodeController;
+    public NodeControllerState(NodeRegistration reg) {
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
         datasetPort = reg.getDatasetPort();
@@ -181,7 +180,7 @@
         threadCount = new int[RRD_SIZE];
         peakThreadCount = new int[RRD_SIZE];
         systemLoadAverage = new double[RRD_SIZE];
-        GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
+        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = 
hbSchema.getGarbageCollectorInfos();
         int gcN = gcInfos.length;
         gcNames = new String[gcN];
         for (int i = 0; i < gcN; ++i) {
@@ -254,10 +253,6 @@
         return System.nanoTime() - lastHeartbeatNanoTime;
     }
 
-    public NodeControllerRemoteProxy getNodeController() {
-        return nodeController;
-    }
-
     public NCConfig getNCConfig() {
         return ncConfig;
     }
@@ -285,9 +280,9 @@
     public synchronized ObjectNode toSummaryJSON() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
-        o.put("node-id", ncConfig.getNodeId());
-        o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
-        o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) 
% RRD_SIZE]);
+        put(o, "node-id", ncConfig.getNodeId());
+        put(o, "heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+        put(o, "system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 
1) % RRD_SIZE]);
 
         return o;
     }
@@ -296,57 +291,58 @@
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 
-        o.put("node-id", ncConfig.getNodeId());
+        put(o, "node-id", ncConfig.getNodeId());
 
         if (includeConfig) {
-            o.put("os-name", osName);
-            o.put("arch", arch);
-            o.put("os-version", osVersion);
-            o.put("num-processors", nProcessors);
-            o.put("vm-name", vmName);
-            o.put("vm-version", vmVersion);
-            o.put("vm-vendor", vmVendor);
-            o.putPOJO("classpath", classpath.split(File.pathSeparator));
-            o.putPOJO("library-path", libraryPath.split(File.pathSeparator));
-            o.putPOJO("boot-classpath", 
bootClasspath.split(File.pathSeparator));
-            o.putPOJO("input-arguments", inputArguments);
-            o.putPOJO("system-properties", systemProperties);
-            o.put("pid", pid);
+            put(o, "os-name", osName);
+            put(o, "arch", arch);
+            put(o, "os-version", osVersion);
+            put(o, "num-processors", nProcessors);
+            put(o, "vm-name", vmName);
+            put(o, "vm-version", vmVersion);
+            put(o, "vm-vendor", vmVendor);
+            put(o, "classpath", StringUtils.split(classpath, 
File.pathSeparatorChar));
+            put(o, "library-path", StringUtils.split(libraryPath, 
File.pathSeparatorChar));
+            put(o, "boot-classpath", StringUtils.split(bootClasspath, 
File.pathSeparatorChar));
+            put(o, "input-arguments", inputArguments);
+            put(o, "input-arguments", inputArguments);
+            put(o, "system-properties", systemProperties);
+            put(o, "pid", pid);
         }
         if (includeStats) {
             o.putPOJO("date", new Date());
-            o.put("rrd-ptr", rrdPtr);
-            o.putPOJO("heartbeat-times", hbTime);
-            o.putPOJO("heap-init-sizes", heapInitSize);
-            o.putPOJO("heap-used-sizes", heapUsedSize);
-            o.putPOJO("heap-committed-sizes", heapCommittedSize);
-            o.putPOJO("heap-max-sizes", heapMaxSize);
-            o.putPOJO("nonheap-init-sizes", nonheapInitSize);
-            o.putPOJO("nonheap-used-sizes", nonheapUsedSize);
-            o.putPOJO("nonheap-committed-sizes", nonheapCommittedSize);
-            o.putPOJO("nonheap-max-sizes", nonheapMaxSize);
-            o.putPOJO("application-memory-budget", 
capacity.getMemoryByteSize());
-            o.putPOJO("application-cpu-core-budget", capacity.getCores());
-            o.putPOJO("thread-counts", threadCount);
-            o.putPOJO("peak-thread-counts", peakThreadCount);
-            o.putPOJO("system-load-averages", systemLoadAverage);
-            o.putPOJO("gc-names", gcNames);
-            o.putPOJO("gc-collection-counts", gcCollectionCounts);
-            o.putPOJO("gc-collection-times", gcCollectionTimes);
-            o.putPOJO("net-payload-bytes-read", netPayloadBytesRead);
-            o.putPOJO("net-payload-bytes-written", netPayloadBytesWritten);
-            o.putPOJO("net-signaling-bytes-read", netSignalingBytesRead);
-            o.putPOJO("net-signaling-bytes-written", netSignalingBytesWritten);
-            o.putPOJO("dataset-net-payload-bytes-read", 
datasetNetPayloadBytesRead);
-            o.putPOJO("dataset-net-payload-bytes-written", 
datasetNetPayloadBytesWritten);
-            o.putPOJO("dataset-net-signaling-bytes-read", 
datasetNetSignalingBytesRead);
-            o.putPOJO("dataset-net-signaling-bytes-written", 
datasetNetSignalingBytesWritten);
-            o.putPOJO("ipc-messages-sent", ipcMessagesSent);
-            o.putPOJO("ipc-message-bytes-sent", ipcMessageBytesSent);
-            o.putPOJO("ipc-messages-received", ipcMessagesReceived);
-            o.putPOJO("ipc-message-bytes-received", ipcMessageBytesReceived);
-            o.putPOJO("disk-reads", diskReads);
-            o.putPOJO("disk-writes", diskWrites);
+            put(o, "rrd-ptr", rrdPtr);
+            put(o, "heartbeat-times", hbTime);
+            put(o, "heap-init-sizes", heapInitSize);
+            put(o, "heap-used-sizes", heapUsedSize);
+            put(o, "heap-committed-sizes", heapCommittedSize);
+            put(o, "heap-max-sizes", heapMaxSize);
+            put(o, "nonheap-init-sizes", nonheapInitSize);
+            put(o, "nonheap-used-sizes", nonheapUsedSize);
+            put(o, "nonheap-committed-sizes", nonheapCommittedSize);
+            put(o, "nonheap-max-sizes", nonheapMaxSize);
+            put(o, "application-memory-budget", capacity.getMemoryByteSize());
+            put(o, "application-cpu-core-budget", capacity.getCores());
+            put(o, "thread-counts", threadCount);
+            put(o, "peak-thread-counts", peakThreadCount);
+            put(o, "system-load-averages", systemLoadAverage);
+            put(o, "gc-names", gcNames);
+            put(o, "gc-collection-counts", gcCollectionCounts);
+            put(o, "gc-collection-times", gcCollectionTimes);
+            put(o, "net-payload-bytes-read", netPayloadBytesRead);
+            put(o, "net-payload-bytes-written", netPayloadBytesWritten);
+            put(o, "net-signaling-bytes-read", netSignalingBytesRead);
+            put(o, "net-signaling-bytes-written", netSignalingBytesWritten);
+            put(o, "dataset-net-payload-bytes-read", 
datasetNetPayloadBytesRead);
+            put(o, "dataset-net-payload-bytes-written", 
datasetNetPayloadBytesWritten);
+            put(o, "dataset-net-signaling-bytes-read", 
datasetNetSignalingBytesRead);
+            put(o, "dataset-net-signaling-bytes-written", 
datasetNetSignalingBytesWritten);
+            put(o, "ipc-messages-sent", ipcMessagesSent);
+            put(o, "ipc-message-bytes-sent", ipcMessageBytesSent);
+            put(o, "ipc-messages-received", ipcMessagesReceived);
+            put(o, "ipc-message-bytes-received", ipcMessageBytesReceived);
+            put(o, "disk-reads", diskReads);
+            put(o, "disk-writes", diskWrites);
         }
 
         return o;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 5e3c3d4..6230f1d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -35,7 +35,7 @@
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 
 public interface IClusterController {
-    void registerNode(NodeRegistration reg) throws Exception;
+    void registerNode(NodeRegistration reg, int registrationId) throws 
Exception;
 
     void unregisterNode(String nodeId) throws Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index a87c30a..76f5ad8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,7 +22,6 @@
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -73,10 +72,6 @@
 
     private final NodeCapacity capacity;
 
-    private final int registrationId;
-
-    private static final AtomicInteger nextRegistrationId = new 
AtomicInteger();
-
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, 
NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String 
osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String 
libraryPath, String bootClasspath,
@@ -103,7 +98,6 @@
         this.messagingPort = messagingPort;
         this.capacity = capacity;
         this.pid = pid;
-        this.registrationId = nextRegistrationId.getAndIncrement();
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -188,9 +182,5 @@
 
     public int getPid() {
         return pid;
-    }
-
-    public int getRegistrationId() {
-        return registrationId;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 3d505f3..c10c8981 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -174,9 +174,11 @@
         private static final long serialVersionUID = 1L;
 
         private final NodeRegistration reg;
+        private final int registrationId;
 
-        public RegisterNodeFunction(NodeRegistration reg) {
+        public RegisterNodeFunction(NodeRegistration reg, int registrationId) {
             this.reg = reg;
+            this.registrationId = registrationId;
         }
 
         @Override
@@ -187,6 +189,10 @@
         public NodeRegistration getNodeRegistration() {
             return reg;
         }
+
+        public int getRegistrationId() {
+            return registrationId;
+        }
     }
 
     public static class UnregisterNodeFunction extends Function {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 027316e..bf35e6b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -63,8 +63,8 @@
     }
 
     @Override
-    public void registerNode(NodeRegistration reg) throws Exception {
-        RegisterNodeFunction fn = new RegisterNodeFunction(reg);
+    public void registerNode(NodeRegistration reg, int registrationId) throws 
Exception {
+        RegisterNodeFunction fn = new RegisterNodeFunction(reg, 
registrationId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 63fffb4..1c6c98e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -54,9 +54,9 @@
         notifyAll();
     }
 
-    public synchronized CcId registerNode(NodeRegistration nodeRegistration) 
throws Exception {
+    public synchronized CcId registerNode(NodeRegistration nodeRegistration, 
int registrationId) throws Exception {
         registrationPending = true;
-        ccs.registerNode(nodeRegistration);
+        ccs.registerNode(nodeRegistration, registrationId);
         while (registrationPending) {
             wait();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0b7254d..f2a29ef 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -42,6 +42,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -65,6 +66,7 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.control.common.NodeControllerState;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
@@ -111,6 +113,7 @@
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
     private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
+    private static final int HEARTBEAT_REFRESH_MILLIS = 60000;
 
     private final NCConfig ncConfig;
 
@@ -185,7 +188,16 @@
     private final ConfigManager configManager;
 
     private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
+
     private NodeStatus status = NodeStatus.BOOTING;
+
+    private NodeRegistration nodeRegistration;
+
+    private NodeControllerState nodeControllerState;
+
+    private final HeartbeatData hbData;
+
+    private static final AtomicInteger nextRegistrationId = new 
AtomicInteger();
 
     static {
         ExitUtil.init();
@@ -239,6 +251,10 @@
             CleanupUtils.close(ioManager, th);
             throw th;
         }
+
+        hbData = new HeartbeatData();
+        hbData.gcCollectionCounts = new long[gcMXBeans.size()];
+        hbData.gcCollectionTimes = new long[gcMXBeans.size()];
     }
 
     public IOManager getIoManager() {
@@ -308,6 +324,11 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
+        initNodeControllerState();
+
+        // Schedule heartbeat computation task before registering with the cc. 
 Note, the compute task ctor does the
+        // initial computation, so we schedule ongoing computations with an 
initial delay
+        timer.schedule(new HeartbeatComputeTask(), HEARTBEAT_REFRESH_MILLIS, 
HEARTBEAT_REFRESH_MILLIS);
 
         this.primaryCcId = addCc(new 
InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
 
@@ -318,6 +339,34 @@
 
         LOGGER.log(Level.INFO, "Started NodeControllerService");
         application.startupCompleted();
+    }
+
+    private void initNodeControllerState() {
+        // Use "public" versions of network addresses and ports, if defined
+        InetSocketAddress ncAddress;
+        if (ncConfig.getClusterPublicPort() == 0) {
+            ncAddress = ipc.getSocketAddress();
+        } else {
+            ncAddress = new 
InetSocketAddress(ncConfig.getClusterPublicAddress(), 
ncConfig.getClusterPublicPort());
+        }
+        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+        for (int i = 0; i < gcInfos.length; ++i) {
+            gcInfos[i] = new 
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+        }
+        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+
+        NetworkAddress datasetAddress = 
datasetNetworkManager.getPublicNetworkAddress();
+        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+        NetworkAddress messagingAddress =
+                messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress() : null;
+        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, 
netAddress, datasetAddress, osMXBean.getName(),
+                osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(),
+                runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), 
runtimeMXBean.getClassPath(),
+                runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+                runtimeMXBean.getSystemProperties(), hbSchema, 
messagingAddress, application.getCapacity(),
+                PidHelper.getPid());
+
+        nodeControllerState = new NodeControllerState(nodeRegistration);
     }
 
     public CcId addCc(InetSocketAddress ccAddress) throws Exception {
@@ -395,31 +444,10 @@
 
     protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) 
throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
-        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
-        for (int i = 0; i < gcInfos.length; ++i) {
-            gcInfos[i] = new 
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
-        }
-        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        // Use "public" versions of network addresses and ports, if defined
-        InetSocketAddress ncAddress;
-        if (ncConfig.getClusterPublicPort() == 0) {
-            ncAddress = ipc.getSocketAddress();
-        } else {
-            ncAddress = new 
InetSocketAddress(ncConfig.getClusterPublicAddress(), 
ncConfig.getClusterPublicPort());
-        }
-        NetworkAddress datasetAddress = 
datasetNetworkManager.getPublicNetworkAddress();
-        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        NetworkAddress messagingAddress =
-                messagingNetManager != null ? 
messagingNetManager.getPublicNetworkAddress() : null;
-        NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, 
id, ncConfig, netAddress, datasetAddress,
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
-                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress,
-                application.getCapacity(), PidHelper.getPid());
 
-        pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc);
-        CcId ccId = ccc.registerNode(nodeRegistration);
+        int registrationId = nextRegistrationId.incrementAndGet();
+        pendingRegistrations.put(registrationId, ccc);
+        CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
         ccMap.put(ccId, ccc);
         ccAddressMap.put(ccAddress, ccId);
         Serializable distributedState = 
ccc.getNodeParameters().getDistributedState();
@@ -642,19 +670,80 @@
         this.status = status;
     }
 
+    public NodeRegistration getNodeRegistration() {
+        return nodeRegistration;
+    }
+
+    public NodeControllerState getNodeControllerState() {
+        return nodeControllerState;
+    }
+
+    private class HeartbeatComputeTask extends TimerTask {
+
+        private HeartbeatComputeTask() {
+            run();
+        }
+
+        @Override
+        public void run() {
+            synchronized (hbData) {
+                MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
+                hbData.heapInitSize = heapUsage.getInit();
+                hbData.heapUsedSize = heapUsage.getUsed();
+                hbData.heapCommittedSize = heapUsage.getCommitted();
+                hbData.heapMaxSize = heapUsage.getMax();
+                MemoryUsage nonheapUsage = 
memoryMXBean.getNonHeapMemoryUsage();
+                hbData.nonheapInitSize = nonheapUsage.getInit();
+                hbData.nonheapUsedSize = nonheapUsage.getUsed();
+                hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
+                hbData.nonheapMaxSize = nonheapUsage.getMax();
+                hbData.threadCount = threadMXBean.getThreadCount();
+                hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
+                hbData.totalStartedThreadCount = 
threadMXBean.getTotalStartedThreadCount();
+                hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
+                int gcN = gcMXBeans.size();
+                for (int i = 0; i < gcN; ++i) {
+                    GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
+                    hbData.gcCollectionCounts[i] = 
gcMXBean.getCollectionCount();
+                    hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
+                }
+
+                MuxDemuxPerformanceCounters netPC = 
netManager.getPerformanceCounters();
+                hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+                hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+                hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+                hbData.netSignalingBytesWritten = 
netPC.getSignalingBytesWritten();
+
+                MuxDemuxPerformanceCounters datasetNetPC = 
datasetNetworkManager.getPerformanceCounters();
+                hbData.datasetNetPayloadBytesRead = 
datasetNetPC.getPayloadBytesRead();
+                hbData.datasetNetPayloadBytesWritten = 
datasetNetPC.getPayloadBytesWritten();
+                hbData.datasetNetSignalingBytesRead = 
datasetNetPC.getSignalingBytesRead();
+                hbData.datasetNetSignalingBytesWritten = 
datasetNetPC.getSignalingBytesWritten();
+
+                IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+                hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+                hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+                hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+                hbData.ipcMessageBytesReceived = 
ipcPC.getMessageBytesReceived();
+
+                hbData.diskReads = ioCounter.getReads();
+                hbData.diskWrites = ioCounter.getWrites();
+                hbData.numCores = Runtime.getRuntime().availableProcessors();
+
+                nodeControllerState.notifyHeartbeat(hbData);
+            }
+            LOGGER.log(Level.DEBUG, "Successfully refreshed heartbeat data");
+        }
+    }
+
     private class HeartbeatTask implements Runnable {
         private final Semaphore delayBlock = new Semaphore(0);
         private final IClusterController cc;
         private final long heartbeatPeriodNanos;
 
-        private final HeartbeatData hbData;
-
         HeartbeatTask(IClusterController cc, long heartbeatPeriod) {
             this.cc = cc;
             this.heartbeatPeriodNanos = 
TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
-            hbData = new HeartbeatData();
-            hbData.gcCollectionCounts = new long[gcMXBeans.size()];
-            hbData.gcCollectionTimes = new long[gcMXBeans.size()];
         }
 
         @Override
@@ -663,69 +752,25 @@
                 try {
                     long nextFireNanoTime = System.nanoTime() + 
heartbeatPeriodNanos;
                     final boolean success = execute();
-                    sleepUntilNextFire(success ? nextFireNanoTime - 
System.nanoTime() : ONE_SECOND_NANOS);
+                    long delayNanos = success ? nextFireNanoTime - 
System.nanoTime() : ONE_SECOND_NANOS;
+                    if (delayNanos > 0) {
+                        delayBlock.tryAcquire(delayNanos, 
TimeUnit.NANOSECONDS); //NOSONAR - ignore result of tryAcquire
+                    } else {
+                        LOGGER.warn("After sending heartbeat, next one is 
already late by "
+                                + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + 
"ms; sending without delay");
+                    }
                 } catch (InterruptedException e) { // NOSONAR
                     break;
                 }
             }
-            LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting 
down");
-        }
-
-        private void sleepUntilNextFire(long delayNanos) throws 
InterruptedException {
-            if (delayNanos > 0) {
-                delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); 
//NOSONAR - ignore result of tryAcquire
-            } else {
-                LOGGER.warn("After sending heartbeat, next one is already late 
by "
-                        + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; 
sending without delay");
-            }
+            LOGGER.log(Level.INFO, "Heartbeat task interrupted; shutting 
down");
         }
 
         private boolean execute() throws InterruptedException {
-            MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
-            hbData.heapInitSize = heapUsage.getInit();
-            hbData.heapUsedSize = heapUsage.getUsed();
-            hbData.heapCommittedSize = heapUsage.getCommitted();
-            hbData.heapMaxSize = heapUsage.getMax();
-            MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
-            hbData.nonheapInitSize = nonheapUsage.getInit();
-            hbData.nonheapUsedSize = nonheapUsage.getUsed();
-            hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
-            hbData.nonheapMaxSize = nonheapUsage.getMax();
-            hbData.threadCount = threadMXBean.getThreadCount();
-            hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
-            hbData.totalStartedThreadCount = 
threadMXBean.getTotalStartedThreadCount();
-            hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
-            int gcN = gcMXBeans.size();
-            for (int i = 0; i < gcN; ++i) {
-                GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
-                hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
-                hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
-            }
-
-            MuxDemuxPerformanceCounters netPC = 
netManager.getPerformanceCounters();
-            hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
-            hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
-            hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
-            hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
-
-            MuxDemuxPerformanceCounters datasetNetPC = 
datasetNetworkManager.getPerformanceCounters();
-            hbData.datasetNetPayloadBytesRead = 
datasetNetPC.getPayloadBytesRead();
-            hbData.datasetNetPayloadBytesWritten = 
datasetNetPC.getPayloadBytesWritten();
-            hbData.datasetNetSignalingBytesRead = 
datasetNetPC.getSignalingBytesRead();
-            hbData.datasetNetSignalingBytesWritten = 
datasetNetPC.getSignalingBytesWritten();
-
-            IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
-            hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
-            hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
-            hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
-            hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
-
-            hbData.diskReads = ioCounter.getReads();
-            hbData.diskWrites = ioCounter.getWrites();
-            hbData.numCores = Runtime.getRuntime().availableProcessors();
-
             try {
-                cc.nodeHeartbeat(id, hbData);
+                synchronized (hbData) {
+                    cc.nodeHeartbeat(id, hbData);
+                }
                 LOGGER.log(Level.DEBUG, "Successfully sent heartbeat");
                 return true;
             } catch (InterruptedException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
index 158ab66..6085c1c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
@@ -20,9 +20,13 @@
 
 import java.io.IOException;
 import java.io.Writer;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -33,6 +37,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class JSONUtil {
 
@@ -84,47 +89,46 @@
         throw new UnsupportedOperationException(o.getClass().getSimpleName());
     }
 
-    private static StringBuilder appendObj(StringBuilder builder, JsonNode 
jobj, int indent) {
-        StringBuilder sb = builder.append("{\n");
+    private static StringBuilder appendObj(final StringBuilder sb, final 
JsonNode outer, final int indent) {
+        sb.append("{\n");
         boolean first = true;
-        for (Iterator<JsonNode> it = jobj.iterator(); it.hasNext();) {
-            final String key = it.next().asText();
+        for (JsonNode inner : outer) {
+            final String key = inner.asText();
             if (first) {
                 first = false;
             } else {
-                sb = sb.append(",\n");
+                sb.append(",\n");
             }
-            sb = indent(sb, indent + 1);
-            sb = quote(sb, key);
-            sb = sb.append(": ");
-            if (jobj.get(key).isArray()) {
-                sb = appendAry(sb, jobj.get(key), indent + 1);
-            } else if (jobj.get(key).isObject()) {
-                sb = appendObj(sb, jobj.get(key), indent + 1);
-            } else {
-                sb = appendOrd(sb, jobj.get(key), indent + 1);
-            }
+            indent(sb, indent + 1);
+            quote(sb, key);
+            sb.append(": ");
+            appendVal(sb, outer.get(key), indent);
         }
-        sb = sb.append("\n");
+        sb.append("\n");
         return indent(sb, indent).append("}");
     }
 
-    private static StringBuilder appendAry(StringBuilder builder, JsonNode 
jarr, int indent) {
-        StringBuilder sb = builder.append("[\n");
+    private static StringBuilder appendVal(final StringBuilder sb, final 
JsonNode value, final int indent) {
+        if (value.isArray()) {
+            appendAry(sb, value, indent + 1);
+        } else if (value.isObject()) {
+            appendObj(sb, value, indent + 1);
+        } else {
+            appendOrd(sb, value, indent + 1);
+        }
+        return sb;
+    }
+
+    private static StringBuilder appendAry(final StringBuilder sb, JsonNode 
jarr, int indent) {
+        sb.append("[\n");
         for (int i = 0; i < jarr.size(); ++i) {
             if (i > 0) {
-                sb = sb.append(",\n");
+                sb.append(",\n");
             }
-            sb = indent(sb, indent + 1);
-            if (jarr.get(i).isArray()) {
-                sb = appendAry(sb, jarr.get(i), indent + 1);
-            } else if (jarr.get(i).isObject()) {
-                sb = appendObj(sb, jarr.get(i), indent + 1);
-            } else {
-                sb = appendOrd(sb, jarr.get(i), indent + 1);
-            }
+            indent(sb, indent + 1);
+            appendVal(sb, jarr.get(i), indent);
         }
-        sb = sb.append("\n");
+        sb.append("\n");
         return indent(sb, indent).append("]");
     }
 
@@ -228,4 +232,48 @@
         aString.append(" }");
         return aString.toString();
     }
+
+    public static void put(ObjectNode o, String name, int value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, String value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, long value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, double value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, long[] elements) {
+        LongStream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, long[][] elements) {
+        Stream.of(elements).forEachOrdered(o.putArray(name)::addPOJO);
+    }
+
+    public static void put(ObjectNode o, String name, int[] elements) {
+        IntStream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, double[] elements) {
+        DoubleStream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, Map<String, String> map) 
{
+        map.forEach(o.putObject(name)::put);
+    }
+
+    public static void put(ObjectNode o, String name, String[] elements) {
+        Stream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, List<String> elements) {
+        elements.forEach(o.putArray(name)::add);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2439
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iccb02350b56328ba1adbca97a1cb5efeb9d9ad14
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to