Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2881

Change subject: [NO ISSUE][CLUS] ACK Heartbeats from NCs
......................................................................

[NO ISSUE][CLUS] ACK Heartbeats from NCs

On HB from NC, respond with an ACK.  In the event a heartbeat is
rec'd from an unknown NC, attempt to respond with an exception to
give the chance for the NC to handle.  On receipt of an exception
on ACK indicating the node is not known to the CC, force a reconnect.

Change-Id: I7be64fd8c550a697729c7fcf8783beae95043cd7
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
22 files changed, 296 insertions(+), 82 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/81/2881/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 99747fa..69601cf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -148,4 +149,14 @@
         }
         return msgCache;
     }
+
+    public boolean matches(String component, int errorCode) {
+        Objects.requireNonNull(component, "component");
+        return component.equals(this.component) && errorCode == this.errorCode;
+    }
+
+    @Override
+    public String toString() {
+        return getLocalizedMessage();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 7e5d22c..85aa8ae 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -74,7 +74,8 @@
                 break;
             case NODE_HEARTBEAT:
                 CCNCFunctions.NodeHeartbeatFunction nhf = 
(CCNCFunctions.NodeHeartbeatFunction) fn;
-                ccs.getExecutor().execute(new NodeHeartbeatWork(ccs, 
nhf.getNodeId(), nhf.getHeartbeatData()));
+                ccs.getExecutor().execute(
+                        new NodeHeartbeatWork(ccs, nhf.getNodeId(), 
nhf.getHeartbeatData(), nhf.getNcAddress()));
                 break;
             case NOTIFY_JOBLET_CLEANUP:
                 CCNCFunctions.NotifyJobletCleanupFunction njcf = 
(CCNCFunctions.NotifyJobletCleanupFunction) fn;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
index 8e7faff..401360c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -27,9 +27,9 @@
 
 public abstract class AbstractHeartbeatWork extends SynchronizableWork {
 
-    private final ClusterControllerService ccs;
-    private final String nodeId;
-    private final HeartbeatData hbData;
+    protected final ClusterControllerService ccs;
+    protected final String nodeId;
+    protected final HeartbeatData hbData;
 
     public AbstractHeartbeatWork(ClusterControllerService ccs, String nodeId, 
HeartbeatData hbData) {
         this.ccs = ccs;
@@ -38,7 +38,7 @@
     }
 
     @Override
-    public void doRun() {
+    public void doRun() throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (state != null) {
@@ -51,6 +51,6 @@
         runWork();
     }
 
-    public abstract void runWork();
+    public abstract void runWork() throws Exception;
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index d0c6567..864d0fd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public final void runWork() {
+    public final void runWork() throws Exception {
         IJobManager jobManager = ccs.getJobManager();
         JobRun run = jobManager.get(jobId);
         if (run != null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 392046d..23c72e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public void runWork() {
+    public void runWork() throws Exception {
         final ICCServiceContext ctx = ccs.getContext();
         try {
             final IMessage data = (IMessage) 
DeploymentUtils.deserialize(message, deploymentId, ctx);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index cc37f9c..46afba5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public void runWork() {
+    public void runWork() throws Exception {
         IJobManager jobManager = ccs.getJobManager();
         final JobRun run = jobManager.get(jobId);
         if (run == null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
index 5c98035..b772ef9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -18,19 +18,39 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.logging.log4j.Level;
 
 public class NodeHeartbeatWork extends AbstractHeartbeatWork {
 
-    public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, 
HeartbeatData hbData) {
+    private final InetSocketAddress ncAddress;
+
+    public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, 
HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
         super(ccs, nodeId, hbData);
+        this.ncAddress = ncAddress;
     }
 
     @Override
-    public void runWork() {
-
+    public void runWork() throws Exception {
+        INodeManager nodeManager = ccs.getNodeManager();
+        final NodeControllerState ncState = 
nodeManager.getNodeControllerState(nodeId);
+        if (ncState != null) {
+            ncState.getNodeController().heartbeatAck(ccs.getCcId(), null);
+        } else {
+            // unregistered nc- let him know
+            NodeControllerRemoteProxy nc =
+                    new NodeControllerRemoteProxy(ccs.getCcId(), 
ccs.getClusterIPC().getReconnectingHandle(ncAddress));
+            nc.heartbeatAck(ccs.getCcId(), 
HyracksDataException.create(ErrorCode.NO_SUCH_NODE, nodeId));
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index 62c19bb..4697440 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -47,7 +47,7 @@
     }
 
     @Override
-    public void runWork() {
+    public void runWork() throws Exception {
         // Triggered remotely by a NC to notify that the NC is deployed.
         DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
         dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 00693df..fe33bc9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -49,10 +49,16 @@
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
-        LOGGER.warn("Registering node: {}", id);
+        LOGGER.info("registering node: {}", id);
         NodeControllerRemoteProxy nc = new 
NodeControllerRemoteProxy(ccs.getCcId(),
                 
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
         INodeManager nodeManager = ccs.getNodeManager();
+        NodeParameters params = new NodeParameters();
+        params.setClusterControllerInfo(ccs.getClusterControllerInfo());
+        params.setDistributedState(ccs.getContext().getDistributedState());
+        
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
+        params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
+        params.setRegistrationId(registrationId);
         try {
             NodeControllerState state = new NodeControllerState(nc, reg);
             nodeManager.addNode(id, state);
@@ -61,21 +67,13 @@
             for (IOption option : cfg.getOptions()) {
                 ncConfiguration.put(option, cfg.get(option));
             }
-            LOGGER.warn("Registered node: {}", id);
-            NodeParameters params = new NodeParameters();
-            params.setClusterControllerInfo(ccs.getClusterControllerInfo());
-            params.setDistributedState(ccs.getContext().getDistributedState());
-            
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
-            
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
-            params.setRegistrationId(registrationId);
-            LOGGER.warn("sending registration response to node {}", id);
+            LOGGER.info("registered node: {}", id);
             nc.sendRegistrationResult(params, null);
-            LOGGER.warn("notifying node {} joined", id);
             ccs.getContext().notifyNodeJoin(id, ncConfiguration);
         } catch (Exception e) {
-            LOGGER.error("Node {} registration failed", id, e);
+            LOGGER.error("node {} registration failed", id, e);
             nodeManager.removeNode(id);
-            nc.sendRegistrationResult(null, e);
+            nc.sendRegistrationResult(params, e);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fbaff55..c811169 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.base;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -54,7 +55,7 @@
 
     void notifyShutdown(String nodeId) throws Exception;
 
-    void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+    void nodeHeartbeat(String id, HeartbeatData hbData, InetSocketAddress 
ncAddress) throws Exception;
 
     void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index d7941f2..42a0d66 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -93,4 +94,13 @@
      * @throws IPCException
      */
     void ping(CcId ccId) throws IPCException;
+
+    /**
+     * Delivers a response to a heartbeat delivered to this {@link CcId}
+     *
+     * @param ccId
+     * @param e
+     * @throws IPCException
+     */
+    void heartbeatAck(CcId ccId, HyracksDataException e) throws IPCException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index b49dc20..1dae48c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -176,7 +176,7 @@
     }
 
     public synchronized void ensureNode(String nodeId) {
-        LOGGER.debug("ensureNode: " + nodeId);
+        LOGGER.trace("+ensureNode: {}", nodeId);
         Map<IOption, Object> nodeDefinedMap =
                 nodeSpecificDefinedMap.computeIfAbsent(nodeId, 
this::createNodeSpecificMap);
         Map<IOption, Object> nodeDefaultMap =
@@ -186,14 +186,14 @@
     }
 
     public synchronized void forgetNode(String nodeId) {
-        LOGGER.debug("forgetNode: " + nodeId);
+        LOGGER.trace("+forgetNode: {}", nodeId);
         nodeSpecificDefinedMap.remove(nodeId);
         nodeSpecificDefaultMap.remove(nodeId);
         nodeEffectiveMaps.remove(nodeId);
     }
 
     private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
-        LOGGER.debug("createNodeSpecificMap: " + nodeId);
+        LOGGER.trace("+createNodeSpecificMap: {}", nodeId);
         return Collections.synchronizedMap(new HashMap<>());
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 2522ebe..1616343 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -24,8 +24,11 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -47,6 +50,7 @@
 import org.apache.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -81,6 +85,7 @@
         NOTIFY_TASK_COMPLETE,
         NOTIFY_TASK_FAILURE,
         NODE_HEARTBEAT,
+        NODE_HEARTBEAT_ACK,
         REPORT_PROFILE,
         REGISTER_PARTITION_PROVIDER,
         REGISTER_PARTITION_REQUEST,
@@ -383,10 +388,12 @@
 
         private final String nodeId;
         private final HeartbeatData hbData;
+        private final InetSocketAddress ncAddress;
 
-        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData, 
InetSocketAddress ncAddress) {
             this.nodeId = nodeId;
             this.hbData = hbData;
+            this.ncAddress = ncAddress;
         }
 
         @Override
@@ -402,21 +409,27 @@
             return hbData;
         }
 
+        public InetSocketAddress getNcAddress() {
+            return ncAddress;
+        }
+
         public static Object deserialize(ByteBuffer buffer, int length) throws 
Exception {
             ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer.array(), buffer.position(), length);
-            DataInputStream dis = new DataInputStream(bais);
+            ObjectInputStream dis = new ObjectInputStream(bais);
 
-            String nodeId = dis.readUTF();
             HeartbeatData hbData = new HeartbeatData();
             hbData.readFields(dis);
-            return new NodeHeartbeatFunction(nodeId, hbData);
+            String nodeId = dis.readUTF();
+            InetSocketAddress ncAddress = (InetSocketAddress) dis.readObject();
+            return new NodeHeartbeatFunction(nodeId, hbData, ncAddress);
         }
 
         public static void serialize(OutputStream out, Object object) throws 
Exception {
             NodeHeartbeatFunction fn = (NodeHeartbeatFunction) object;
-            DataOutputStream dos = new DataOutputStream(out);
-            dos.writeUTF(fn.nodeId);
+            ObjectOutputStream dos = new ObjectOutputStream(out);
             fn.hbData.write(dos);
+            dos.writeUTF(fn.nodeId);
+            dos.writeObject(fn.ncAddress);
         }
     }
 
@@ -1332,6 +1345,25 @@
         }
     }
 
+    public static class NodeHeartbeatAckFunction extends CCIdentifiedFunction {
+        private static final long serialVersionUID = 1L;
+        private final HyracksDataException exception;
+
+        public NodeHeartbeatAckFunction(CcId ccId, HyracksDataException e) {
+            super(ccId);
+            exception = e;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT_ACK;
+        }
+
+        public HyracksDataException getException() {
+            return exception;
+        }
+    }
+
     public static class ShutdownRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 06904c2..13a08b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -101,8 +102,8 @@
     }
 
     @Override
-    public void nodeHeartbeat(String id, HeartbeatData hbData) throws 
Exception {
-        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
+    public void nodeHeartbeat(String id, HeartbeatData hbData, 
InetSocketAddress ncAddress) throws Exception {
+        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData, 
ncAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index dd10020..d32ee32 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -153,6 +154,11 @@
         ipcHandle.send(-1, new CCNCFunctions.PingFunction(ccId), null);
     }
 
+    @Override
+    public void heartbeatAck(CcId ccId, HyracksDataException e) throws 
IPCException {
+        ipcHandle.send(-1, new CCNCFunctions.NodeHeartbeatAckFunction(ccId, 
e), null);
+    }
+
     public InetSocketAddress getAddress() {
         return ipcHandle.getRemoteAddress();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 77623c2..d1f7d5a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -36,13 +36,15 @@
     private static final long REGISTRATION_RESPONSE_POLL_PERIOD = 
TimeUnit.SECONDS.toMillis(1);
 
     private final IClusterController ccs;
+    private final InetSocketAddress ccAddress;
     private boolean registrationPending;
     private boolean registrationCompleted;
     private Exception registrationException;
     private NodeParameters nodeParameters;
 
-    CcConnection(IClusterController ccs) {
+    CcConnection(IClusterController ccs, InetSocketAddress ccAddress) {
         this.ccs = ccs;
+        this.ccAddress = ccAddress;
     }
 
     @Override
@@ -86,19 +88,17 @@
         return nodeParameters;
     }
 
-    public synchronized void notifyConnectionRestored(NodeControllerService 
ncs, InetSocketAddress ccAddress)
-            throws InterruptedException {
-        if (registrationCompleted) {
-            registrationCompleted = false;
-            ncs.getExecutor().submit(() -> {
-                try {
-                    return ncs.registerNode(this, ccAddress);
-                } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Failed registering with cc", e);
-                    throw new IllegalStateException(e);
-                }
-            });
-        }
+    public synchronized void forceReregister(NodeControllerService ncs) throws 
InterruptedException {
+        registrationCompleted = false;
+        ncs.getExecutor().submit(() -> {
+            try {
+                return ncs.registerNode(this);
+            } catch (Exception e) {
+                LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+                throw new IllegalStateException(e);
+            }
+        });
+
         while (!registrationCompleted) {
             wait();
         }
@@ -108,4 +108,8 @@
         registrationCompleted = true;
         notifyAll();
     }
+
+    public InetSocketAddress getCcAddress() {
+        return ccAddress;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index cdc16fa..3bc9710 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -20,6 +20,8 @@
 
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.nc.task.HeartbeatAckTask;
+import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.task.ShutdownTask;
 import org.apache.hyracks.control.nc.task.ThreadDumpTask;
 import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
@@ -28,7 +30,6 @@
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
 import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
-import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -139,6 +140,11 @@
                 ncs.getExecutor().submit(new PingTask(ncs, pcf.getCcId()));
                 return;
 
+            case NODE_HEARTBEAT_ACK:
+                final CCNCFunctions.NodeHeartbeatAckFunction nbaf = 
(CCNCFunctions.NodeHeartbeatAckFunction) fn;
+                ncs.getExecutor().submit(new HeartbeatAckTask(ncs, 
nbaf.getCcId(), nbaf.getException()));
+                return;
+
             default:
                 throw new IllegalArgumentException("Unknown function: " + 
fn.getFunctionId());
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a189ac5..653d6e0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -59,7 +59,6 @@
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.NodeControllerData;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
@@ -75,7 +74,7 @@
 import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
 import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask;
-import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask;
+import org.apache.hyracks.control.nc.heartbeat.HeartbeatManager;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
@@ -144,7 +143,7 @@
 
     private ExecutorService executor;
 
-    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
+    private Map<CcId, HeartbeatManager> heartbeatManagers = new 
ConcurrentHashMap<>();
 
     private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
@@ -348,7 +347,7 @@
                     // we need to re-register in case of NC -> CC connection 
reset
                     final CcConnection ccConnection = 
getCcConnection(ccAddressMap.get(ccAddress));
                     try {
-                        
ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+                        
ccConnection.forceReregister(NodeControllerService.this);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         throw new IPCException(e);
@@ -357,7 +356,7 @@
             };
             ClusterControllerRemoteProxy ccProxy = new 
ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, 
ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            return registerNode(new CcConnection(ccProxy), ccAddress);
+            return registerNode(new CcConnection(ccProxy, ccAddress));
         }
     }
 
@@ -395,8 +394,10 @@
                         () -> String.valueOf(e));
             }
             getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
-            Thread hbThread = heartbeatThreads.remove(ccId);
-            hbThread.interrupt();
+            HeartbeatManager hbMgr = heartbeatManagers.remove(ccId);
+            if (hbMgr != null) {
+                hbMgr.shutdown();
+            }
             Timer ccTimer = ccTimers.remove(ccId);
             if (ccTimer != null) {
                 ccTimer.cancel();
@@ -406,13 +407,13 @@
         }
     }
 
-    public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) 
throws Exception {
+    public CcId registerNode(CcConnection ccc) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
         int registrationId = nextRegistrationId.incrementAndGet();
         pendingRegistrations.put(registrationId, ccc);
         CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
         ccMap.put(ccId, ccc);
-        ccAddressMap.put(ccAddress, ccId);
+        ccAddressMap.put(ccc.getCcAddress(), ccId);
         Serializable distributedState = 
ccc.getNodeParameters().getDistributedState();
         if (distributedState != null) {
             getDistributedState().put(ccId, distributedState);
@@ -420,15 +421,8 @@
         IClusterController ccs = ccc.getClusterControllerService();
         NodeParameters nodeParameters = ccc.getNodeParameters();
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccId)) {
-            Thread heartbeatThread = new Thread(
-                    new HeartbeatTask(getId(), hbTask.getHeartbeatData(), ccs, 
nodeParameters.getHeartbeatPeriod()),
-                    id + "-Heartbeat");
-            heartbeatThread.setPriority(Thread.MAX_PRIORITY);
-            heartbeatThread.setDaemon(true);
-            heartbeatThread.start();
-            heartbeatThreads.put(ccId, heartbeatThread);
-        }
+        heartbeatManagers.computeIfAbsent(ccId, newCcId -> 
HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
+                nodeRegistration.getNodeControllerAddress()));
         if (!ccTimers.containsKey(ccId) && 
nodeParameters.getProfileDumpPeriod() > 0) {
             Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
@@ -506,10 +500,7 @@
              * Stop heartbeats only after NC has stopped to avoid false node 
failure detection
              * on CC if an NC takes a long time to stop.
              */
-            heartbeatThreads.values().parallelStream().forEach(t -> {
-                t.interrupt();
-                InvokeUtil.doUninterruptibly(() -> t.join(1000));
-            });
+            
heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown);
             synchronized (ccLock) {
                 ccMap.values().parallelStream().forEach(cc -> {
                     try {
@@ -673,6 +664,14 @@
         return application.getApplicationContext();
     }
 
+    public HeartbeatManager getHeartbeatManager(CcId ccId) {
+        return heartbeatManagers.get(ccId);
+    }
+
+    public NodeRegistration getNodeRegistration() {
+        return nodeRegistration;
+    }
+
     private class ProfileDumpTask extends TimerTask {
         private final IClusterController cc;
         private final CcId ccId;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
index 81ceea5..cd051e3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
@@ -33,7 +33,6 @@
 import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
new file mode 100644
index 0000000..0ea6399
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.heartbeat;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.nc.CcConnection;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcConnection ccc;
+    private final Thread hbThread;
+    private final CcId ccId;
+
+    private HeartbeatManager(NodeControllerService ncs, CcConnection ccc, 
HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
+        this.ncs = ncs;
+        this.ccc = ccc;
+        hbThread = new Thread(new HeartbeatTask(ncs.getId(), hbData, 
ccc.getClusterControllerService(),
+                ccc.getNodeParameters().getHeartbeatPeriod(), ncAddress), 
ncs.getId() + "-Heartbeat");
+        hbThread.setPriority(Thread.MAX_PRIORITY);
+        hbThread.setDaemon(true);
+        ccId = ccc.getCcId();
+    }
+
+    public static HeartbeatManager init(NodeControllerService ncs, 
CcConnection ccc, HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
+        HeartbeatManager hbMgr = new HeartbeatManager(ncs, ccc, hbData, 
ncAddress);
+        hbMgr.start();
+        return hbMgr;
+    }
+
+    public void shutdown() {
+        hbThread.interrupt();
+    }
+
+    public void start() {
+        hbThread.start();
+    }
+
+    public void notifyAck(HyracksDataException exception) {
+        // TODO: we should also reregister in case of no ack
+        LOGGER.debug("ack rec'd from {} w/ exception: {}", ccId::toString, () 
-> String.valueOf(exception));
+        if (exception != null && exception.matches(ErrorCode.HYRACKS, 
ErrorCode.NO_SUCH_NODE)) {
+            LOGGER.info("{} indicates it does not recognize us; force a 
reconnect", ccId);
+            try {
+                ccc.forceReregister(ncs);
+            } catch (Exception e) {
+                LOGGER.warn("ignoring exception attempting to reregister with 
{}", ccId, e);
+            }
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
index 6d08fb1..4af7e4e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.nc.heartbeat;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -36,12 +37,15 @@
     private final Semaphore delayBlock = new Semaphore(0);
     private final IClusterController cc;
     private final long heartbeatPeriodNanos;
+    private final InetSocketAddress ncAddress;
 
-    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController 
cc, long heartbeatPeriod) {
+    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController 
cc, long heartbeatPeriod,
+            InetSocketAddress ncAddress) {
         this.ncId = ncId;
         this.hbData = hbData;
         this.cc = cc;
         this.heartbeatPeriodNanos = 
TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
+        this.ncAddress = ncAddress;
     }
 
     @Override
@@ -67,18 +71,15 @@
     private boolean execute() throws InterruptedException {
         try {
             synchronized (hbData) {
-                cc.nodeHeartbeat(ncId, hbData);
+                cc.nodeHeartbeat(ncId, hbData, ncAddress);
             }
             LOGGER.trace("Successfully sent heartbeat");
             return true;
         } catch (InterruptedException e) {
             throw e;
         } catch (Exception e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will 
retry after 1s", e);
-            } else {
-                LOGGER.log(Level.ERROR, "Exception sending heartbeat; will 
retry after 1s: " + e.toString());
-            }
+            LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will retry 
after 1s", e);
+            LOGGER.log(Level.WARN, "Exception sending heartbeat; will retry 
after 1s: " + e.toString());
             return false;
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
new file mode 100644
index 0000000..f43c029
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.task;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatAckTask implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+    private final HyracksDataException exception;
+
+    public HeartbeatAckTask(NodeControllerService ncs, CcId ccId, 
HyracksDataException exception) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+        this.exception = exception;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.getHeartbeatManager(ccId).notifyAck(exception);
+        } catch (Exception e) {
+            LOGGER.info("failure processing heartbeat ack from {}", ccId, e);
+        }
+    }
+}
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2881
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7be64fd8c550a697729c7fcf8783beae95043cd7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mb...@apache.org>

Reply via email to