Repository: asterixdb
Updated Branches:
  refs/heads/master b3a2cb21b -> 13860fb52


[NO ISSUE][CLUS] ACK Heartbeats from NCs

On HB from NC, respond with an ACK.  In the event a heartbeat is
rec'd from an unknown NC, attempt to respond with an exception to
give the chance for the NC to handle.  On receipt of an exception
on ACK indicating the node is not known to the CC, force a reconnect.

Change-Id: I7be64fd8c550a697729c7fcf8783beae95043cd7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2881
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/13860fb5
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/13860fb5
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/13860fb5

Branch: refs/heads/master
Commit: 13860fb525e4f64d670b12dc0e5ecc596d1d69e6
Parents: b3a2cb2
Author: Michael Blow <mb...@apache.org>
Authored: Sat Aug 11 17:52:40 2018 -0700
Committer: Michael Blow <mb...@apache.org>
Committed: Sat Aug 11 20:34:32 2018 -0700

----------------------------------------------------------------------
 .../api/exceptions/HyracksException.java        | 11 +++
 .../control/cc/ClusterControllerIPCI.java       |  3 +-
 .../control/cc/work/AbstractHeartbeatWork.java  | 10 +--
 .../cc/work/AbstractTaskLifecycleWork.java      |  4 -
 .../control/cc/work/ApplicationMessageWork.java |  4 -
 .../cc/work/JobletCleanupNotificationWork.java  |  4 -
 .../control/cc/work/NodeHeartbeatWork.java      | 26 ++++++-
 .../control/cc/work/NotifyDeployBinaryWork.java |  4 -
 .../control/cc/work/RegisterNodeWork.java       | 22 +++---
 .../control/common/base/IClusterController.java |  3 +-
 .../control/common/base/INodeController.java    | 10 +++
 .../control/common/config/ConfigManager.java    |  6 +-
 .../control/common/ipc/CCNCFunctions.java       | 44 +++++++++--
 .../ipc/ClusterControllerRemoteProxy.java       |  5 +-
 .../common/ipc/NodeControllerRemoteProxy.java   |  6 ++
 .../apache/hyracks/control/nc/CcConnection.java | 32 ++++----
 .../hyracks/control/nc/NodeControllerIPCI.java  |  8 +-
 .../control/nc/NodeControllerService.java       | 43 ++++++-----
 .../nc/heartbeat/HeartbeatComputeTask.java      |  1 -
 .../control/nc/heartbeat/HeartbeatManager.java  | 77 ++++++++++++++++++++
 .../control/nc/heartbeat/HeartbeatTask.java     | 15 ++--
 .../control/nc/task/HeartbeatAckTask.java       | 48 ++++++++++++
 22 files changed, 292 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 99747fa..69601cf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.api.exceptions;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -148,4 +149,14 @@ public class HyracksException extends IOException 
implements IFormattedException
         }
         return msgCache;
     }
+
+    public boolean matches(String component, int errorCode) {
+        Objects.requireNonNull(component, "component");
+        return component.equals(this.component) && errorCode == this.errorCode;
+    }
+
+    @Override
+    public String toString() {
+        return getLocalizedMessage();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 7e5d22c..85aa8ae 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -74,7 +74,8 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case NODE_HEARTBEAT:
                 CCNCFunctions.NodeHeartbeatFunction nhf = 
(CCNCFunctions.NodeHeartbeatFunction) fn;
-                ccs.getExecutor().execute(new NodeHeartbeatWork(ccs, 
nhf.getNodeId(), nhf.getHeartbeatData()));
+                ccs.getExecutor().execute(
+                        new NodeHeartbeatWork(ccs, nhf.getNodeId(), 
nhf.getHeartbeatData(), nhf.getNcAddress()));
                 break;
             case NOTIFY_JOBLET_CLEANUP:
                 CCNCFunctions.NotifyJobletCleanupFunction njcf = 
(CCNCFunctions.NotifyJobletCleanupFunction) fn;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
index 8e7faff..401360c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -27,9 +27,9 @@ import 
org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public abstract class AbstractHeartbeatWork extends SynchronizableWork {
 
-    private final ClusterControllerService ccs;
-    private final String nodeId;
-    private final HeartbeatData hbData;
+    protected final ClusterControllerService ccs;
+    protected final String nodeId;
+    protected final HeartbeatData hbData;
 
     public AbstractHeartbeatWork(ClusterControllerService ccs, String nodeId, 
HeartbeatData hbData) {
         this.ccs = ccs;
@@ -38,7 +38,7 @@ public abstract class AbstractHeartbeatWork extends 
SynchronizableWork {
     }
 
     @Override
-    public void doRun() {
+    public void doRun() throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (state != null) {
@@ -51,6 +51,6 @@ public abstract class AbstractHeartbeatWork extends 
SynchronizableWork {
         runWork();
     }
 
-    public abstract void runWork();
+    public abstract void runWork() throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index d0c6567..b9053ff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -36,17 +36,13 @@ import org.apache.hyracks.control.cc.job.TaskCluster;
 import org.apache.hyracks.control.cc.job.TaskClusterAttempt;
 
 public abstract class AbstractTaskLifecycleWork extends AbstractHeartbeatWork {
-    protected final ClusterControllerService ccs;
     protected final JobId jobId;
     protected final TaskAttemptId taId;
-    protected final String nodeId;
 
     public AbstractTaskLifecycleWork(ClusterControllerService ccs, JobId 
jobId, TaskAttemptId taId, String nodeId) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
         this.jobId = jobId;
         this.taId = taId;
-        this.nodeId = nodeId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 392046d..f2aa1f4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -35,15 +35,11 @@ public class ApplicationMessageWork extends 
AbstractHeartbeatWork {
     private static final Logger LOGGER = LogManager.getLogger();
     private byte[] message;
     private DeploymentId deploymentId;
-    private String nodeId;
-    private ClusterControllerService ccs;
 
     public ApplicationMessageWork(ClusterControllerService ccs, byte[] 
message, DeploymentId deploymentId,
             String nodeId) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
         this.deploymentId = deploymentId;
-        this.nodeId = nodeId;
         this.message = message;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index cc37f9c..727793b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -37,15 +37,11 @@ import org.apache.logging.log4j.Logger;
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private ClusterControllerService ccs;
     private JobId jobId;
-    private String nodeId;
 
     public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId 
jobId, String nodeId) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
         this.jobId = jobId;
-        this.nodeId = nodeId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
index 5c98035..b772ef9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -18,19 +18,39 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.logging.log4j.Level;
 
 public class NodeHeartbeatWork extends AbstractHeartbeatWork {
 
-    public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, 
HeartbeatData hbData) {
+    private final InetSocketAddress ncAddress;
+
+    public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, 
HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
         super(ccs, nodeId, hbData);
+        this.ncAddress = ncAddress;
     }
 
     @Override
-    public void runWork() {
-
+    public void runWork() throws Exception {
+        INodeManager nodeManager = ccs.getNodeManager();
+        final NodeControllerState ncState = 
nodeManager.getNodeControllerState(nodeId);
+        if (ncState != null) {
+            ncState.getNodeController().heartbeatAck(ccs.getCcId(), null);
+        } else {
+            // unregistered nc- let him know
+            NodeControllerRemoteProxy nc =
+                    new NodeControllerRemoteProxy(ccs.getCcId(), 
ccs.getClusterIPC().getReconnectingHandle(ncAddress));
+            nc.heartbeatAck(ccs.getCcId(), 
HyracksDataException.create(ErrorCode.NO_SUCH_NODE, nodeId));
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index 62c19bb..80aae39 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -31,16 +31,12 @@ import 
org.apache.hyracks.control.common.deployment.DeploymentStatus;
  */
 public class NotifyDeployBinaryWork extends AbstractHeartbeatWork {
 
-    private final ClusterControllerService ccs;
-    private final String nodeId;
     private final DeploymentId deploymentId;
     private DeploymentStatus deploymentStatus;
 
     public NotifyDeployBinaryWork(ClusterControllerService ccs, DeploymentId 
deploymentId, String nodeId,
             DeploymentStatus deploymentStatus) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
-        this.nodeId = nodeId;
         this.deploymentId = deploymentId;
         this.deploymentStatus = deploymentStatus;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 00693df..fe33bc9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -49,10 +49,16 @@ public class RegisterNodeWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
-        LOGGER.warn("Registering node: {}", id);
+        LOGGER.info("registering node: {}", id);
         NodeControllerRemoteProxy nc = new 
NodeControllerRemoteProxy(ccs.getCcId(),
                 
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
         INodeManager nodeManager = ccs.getNodeManager();
+        NodeParameters params = new NodeParameters();
+        params.setClusterControllerInfo(ccs.getClusterControllerInfo());
+        params.setDistributedState(ccs.getContext().getDistributedState());
+        
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
+        params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
+        params.setRegistrationId(registrationId);
         try {
             NodeControllerState state = new NodeControllerState(nc, reg);
             nodeManager.addNode(id, state);
@@ -61,21 +67,13 @@ public class RegisterNodeWork extends SynchronizableWork {
             for (IOption option : cfg.getOptions()) {
                 ncConfiguration.put(option, cfg.get(option));
             }
-            LOGGER.warn("Registered node: {}", id);
-            NodeParameters params = new NodeParameters();
-            params.setClusterControllerInfo(ccs.getClusterControllerInfo());
-            params.setDistributedState(ccs.getContext().getDistributedState());
-            
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
-            
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
-            params.setRegistrationId(registrationId);
-            LOGGER.warn("sending registration response to node {}", id);
+            LOGGER.info("registered node: {}", id);
             nc.sendRegistrationResult(params, null);
-            LOGGER.warn("notifying node {} joined", id);
             ccs.getContext().notifyNodeJoin(id, ncConfiguration);
         } catch (Exception e) {
-            LOGGER.error("Node {} registration failed", id, e);
+            LOGGER.error("node {} registration failed", id, e);
             nodeManager.removeNode(id);
-            nc.sendRegistrationResult(null, e);
+            nc.sendRegistrationResult(params, e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fbaff55..c811169 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.base;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -54,7 +55,7 @@ public interface IClusterController {
 
     void notifyShutdown(String nodeId) throws Exception;
 
-    void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+    void nodeHeartbeat(String id, HeartbeatData hbData, InetSocketAddress 
ncAddress) throws Exception;
 
     void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index d7941f2..42a0d66 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -93,4 +94,13 @@ public interface INodeController {
      * @throws IPCException
      */
     void ping(CcId ccId) throws IPCException;
+
+    /**
+     * Delivers a response to a heartbeat delivered to this {@link CcId}
+     *
+     * @param ccId
+     * @param e
+     * @throws IPCException
+     */
+    void heartbeatAck(CcId ccId, HyracksDataException e) throws IPCException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index b49dc20..1dae48c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -176,7 +176,7 @@ public class ConfigManager implements IConfigManager, 
Serializable {
     }
 
     public synchronized void ensureNode(String nodeId) {
-        LOGGER.debug("ensureNode: " + nodeId);
+        LOGGER.trace("+ensureNode: {}", nodeId);
         Map<IOption, Object> nodeDefinedMap =
                 nodeSpecificDefinedMap.computeIfAbsent(nodeId, 
this::createNodeSpecificMap);
         Map<IOption, Object> nodeDefaultMap =
@@ -186,14 +186,14 @@ public class ConfigManager implements IConfigManager, 
Serializable {
     }
 
     public synchronized void forgetNode(String nodeId) {
-        LOGGER.debug("forgetNode: " + nodeId);
+        LOGGER.trace("+forgetNode: {}", nodeId);
         nodeSpecificDefinedMap.remove(nodeId);
         nodeSpecificDefaultMap.remove(nodeId);
         nodeEffectiveMaps.remove(nodeId);
     }
 
     private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
-        LOGGER.debug("createNodeSpecificMap: " + nodeId);
+        LOGGER.trace("+createNodeSpecificMap: {}", nodeId);
         return Collections.synchronizedMap(new HashMap<>());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 2522ebe..1616343 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -24,8 +24,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -47,6 +50,7 @@ import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -81,6 +85,7 @@ public class CCNCFunctions {
         NOTIFY_TASK_COMPLETE,
         NOTIFY_TASK_FAILURE,
         NODE_HEARTBEAT,
+        NODE_HEARTBEAT_ACK,
         REPORT_PROFILE,
         REGISTER_PARTITION_PROVIDER,
         REGISTER_PARTITION_REQUEST,
@@ -383,10 +388,12 @@ public class CCNCFunctions {
 
         private final String nodeId;
         private final HeartbeatData hbData;
+        private final InetSocketAddress ncAddress;
 
-        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData, 
InetSocketAddress ncAddress) {
             this.nodeId = nodeId;
             this.hbData = hbData;
+            this.ncAddress = ncAddress;
         }
 
         @Override
@@ -402,21 +409,27 @@ public class CCNCFunctions {
             return hbData;
         }
 
+        public InetSocketAddress getNcAddress() {
+            return ncAddress;
+        }
+
         public static Object deserialize(ByteBuffer buffer, int length) throws 
Exception {
             ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer.array(), buffer.position(), length);
-            DataInputStream dis = new DataInputStream(bais);
+            ObjectInputStream dis = new ObjectInputStream(bais);
 
-            String nodeId = dis.readUTF();
             HeartbeatData hbData = new HeartbeatData();
             hbData.readFields(dis);
-            return new NodeHeartbeatFunction(nodeId, hbData);
+            String nodeId = dis.readUTF();
+            InetSocketAddress ncAddress = (InetSocketAddress) dis.readObject();
+            return new NodeHeartbeatFunction(nodeId, hbData, ncAddress);
         }
 
         public static void serialize(OutputStream out, Object object) throws 
Exception {
             NodeHeartbeatFunction fn = (NodeHeartbeatFunction) object;
-            DataOutputStream dos = new DataOutputStream(out);
-            dos.writeUTF(fn.nodeId);
+            ObjectOutputStream dos = new ObjectOutputStream(out);
             fn.hbData.write(dos);
+            dos.writeUTF(fn.nodeId);
+            dos.writeObject(fn.ncAddress);
         }
     }
 
@@ -1332,6 +1345,25 @@ public class CCNCFunctions {
         }
     }
 
+    public static class NodeHeartbeatAckFunction extends CCIdentifiedFunction {
+        private static final long serialVersionUID = 1L;
+        private final HyracksDataException exception;
+
+        public NodeHeartbeatAckFunction(CcId ccId, HyracksDataException e) {
+            super(ccId);
+            exception = e;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT_ACK;
+        }
+
+        public HyracksDataException getException() {
+            return exception;
+        }
+    }
+
     public static class ShutdownRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 06904c2..13a08b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -101,8 +102,8 @@ public class ClusterControllerRemoteProxy implements 
IClusterController {
     }
 
     @Override
-    public void nodeHeartbeat(String id, HeartbeatData hbData) throws 
Exception {
-        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
+    public void nodeHeartbeat(String id, HeartbeatData hbData, 
InetSocketAddress ncAddress) throws Exception {
+        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData, 
ncAddress);
         ipcHandle.send(-1, fn, null);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index dd10020..d32ee32 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -153,6 +154,11 @@ public class NodeControllerRemoteProxy implements 
INodeController {
         ipcHandle.send(-1, new CCNCFunctions.PingFunction(ccId), null);
     }
 
+    @Override
+    public void heartbeatAck(CcId ccId, HyracksDataException e) throws 
IPCException {
+        ipcHandle.send(-1, new CCNCFunctions.NodeHeartbeatAckFunction(ccId, 
e), null);
+    }
+
     public InetSocketAddress getAddress() {
         return ipcHandle.getRemoteAddress();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 77623c2..d1f7d5a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -36,13 +36,15 @@ public class CcConnection {
     private static final long REGISTRATION_RESPONSE_POLL_PERIOD = 
TimeUnit.SECONDS.toMillis(1);
 
     private final IClusterController ccs;
+    private final InetSocketAddress ccAddress;
     private boolean registrationPending;
     private boolean registrationCompleted;
     private Exception registrationException;
     private NodeParameters nodeParameters;
 
-    CcConnection(IClusterController ccs) {
+    CcConnection(IClusterController ccs, InetSocketAddress ccAddress) {
         this.ccs = ccs;
+        this.ccAddress = ccAddress;
     }
 
     @Override
@@ -86,19 +88,17 @@ public class CcConnection {
         return nodeParameters;
     }
 
-    public synchronized void notifyConnectionRestored(NodeControllerService 
ncs, InetSocketAddress ccAddress)
-            throws InterruptedException {
-        if (registrationCompleted) {
-            registrationCompleted = false;
-            ncs.getExecutor().submit(() -> {
-                try {
-                    return ncs.registerNode(this, ccAddress);
-                } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Failed registering with cc", e);
-                    throw new IllegalStateException(e);
-                }
-            });
-        }
+    public synchronized void forceReregister(NodeControllerService ncs) throws 
InterruptedException {
+        registrationCompleted = false;
+        ncs.getExecutor().submit(() -> {
+            try {
+                return ncs.registerNode(this);
+            } catch (Exception e) {
+                LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+                throw new IllegalStateException(e);
+            }
+        });
+
         while (!registrationCompleted) {
             wait();
         }
@@ -108,4 +108,8 @@ public class CcConnection {
         registrationCompleted = true;
         notifyAll();
     }
+
+    public InetSocketAddress getCcAddress() {
+        return ccAddress;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index cdc16fa..3bc9710 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.control.nc;
 
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.nc.task.HeartbeatAckTask;
+import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.task.ShutdownTask;
 import org.apache.hyracks.control.nc.task.ThreadDumpTask;
 import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
@@ -28,7 +30,6 @@ import 
org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
 import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
-import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -139,6 +140,11 @@ final class NodeControllerIPCI implements IIPCI {
                 ncs.getExecutor().submit(new PingTask(ncs, pcf.getCcId()));
                 return;
 
+            case NODE_HEARTBEAT_ACK:
+                final CCNCFunctions.NodeHeartbeatAckFunction nbaf = 
(CCNCFunctions.NodeHeartbeatAckFunction) fn;
+                ncs.getExecutor().submit(new HeartbeatAckTask(ncs, 
nbaf.getCcId(), nbaf.getException()));
+                return;
+
             default:
                 throw new IllegalArgumentException("Unknown function: " + 
fn.getFunctionId());
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a189ac5..653d6e0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -59,7 +59,6 @@ import 
org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.NodeControllerData;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
@@ -75,7 +74,7 @@ import org.apache.hyracks.control.common.work.FutureValue;
 import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
 import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask;
-import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask;
+import org.apache.hyracks.control.nc.heartbeat.HeartbeatManager;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
@@ -144,7 +143,7 @@ public class NodeControllerService implements 
IControllerService {
 
     private ExecutorService executor;
 
-    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
+    private Map<CcId, HeartbeatManager> heartbeatManagers = new 
ConcurrentHashMap<>();
 
     private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
@@ -348,7 +347,7 @@ public class NodeControllerService implements 
IControllerService {
                     // we need to re-register in case of NC -> CC connection 
reset
                     final CcConnection ccConnection = 
getCcConnection(ccAddressMap.get(ccAddress));
                     try {
-                        
ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+                        
ccConnection.forceReregister(NodeControllerService.this);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         throw new IPCException(e);
@@ -357,7 +356,7 @@ public class NodeControllerService implements 
IControllerService {
             };
             ClusterControllerRemoteProxy ccProxy = new 
ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, 
ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            return registerNode(new CcConnection(ccProxy), ccAddress);
+            return registerNode(new CcConnection(ccProxy, ccAddress));
         }
     }
 
@@ -395,8 +394,10 @@ public class NodeControllerService implements 
IControllerService {
                         () -> String.valueOf(e));
             }
             getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
-            Thread hbThread = heartbeatThreads.remove(ccId);
-            hbThread.interrupt();
+            HeartbeatManager hbMgr = heartbeatManagers.remove(ccId);
+            if (hbMgr != null) {
+                hbMgr.shutdown();
+            }
             Timer ccTimer = ccTimers.remove(ccId);
             if (ccTimer != null) {
                 ccTimer.cancel();
@@ -406,13 +407,13 @@ public class NodeControllerService implements 
IControllerService {
         }
     }
 
-    public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) 
throws Exception {
+    public CcId registerNode(CcConnection ccc) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
         int registrationId = nextRegistrationId.incrementAndGet();
         pendingRegistrations.put(registrationId, ccc);
         CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
         ccMap.put(ccId, ccc);
-        ccAddressMap.put(ccAddress, ccId);
+        ccAddressMap.put(ccc.getCcAddress(), ccId);
         Serializable distributedState = 
ccc.getNodeParameters().getDistributedState();
         if (distributedState != null) {
             getDistributedState().put(ccId, distributedState);
@@ -420,15 +421,8 @@ public class NodeControllerService implements 
IControllerService {
         IClusterController ccs = ccc.getClusterControllerService();
         NodeParameters nodeParameters = ccc.getNodeParameters();
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccId)) {
-            Thread heartbeatThread = new Thread(
-                    new HeartbeatTask(getId(), hbTask.getHeartbeatData(), ccs, 
nodeParameters.getHeartbeatPeriod()),
-                    id + "-Heartbeat");
-            heartbeatThread.setPriority(Thread.MAX_PRIORITY);
-            heartbeatThread.setDaemon(true);
-            heartbeatThread.start();
-            heartbeatThreads.put(ccId, heartbeatThread);
-        }
+        heartbeatManagers.computeIfAbsent(ccId, newCcId -> 
HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
+                nodeRegistration.getNodeControllerAddress()));
         if (!ccTimers.containsKey(ccId) && 
nodeParameters.getProfileDumpPeriod() > 0) {
             Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
@@ -506,10 +500,7 @@ public class NodeControllerService implements 
IControllerService {
              * Stop heartbeats only after NC has stopped to avoid false node 
failure detection
              * on CC if an NC takes a long time to stop.
              */
-            heartbeatThreads.values().parallelStream().forEach(t -> {
-                t.interrupt();
-                InvokeUtil.doUninterruptibly(() -> t.join(1000));
-            });
+            
heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown);
             synchronized (ccLock) {
                 ccMap.values().parallelStream().forEach(cc -> {
                     try {
@@ -673,6 +664,14 @@ public class NodeControllerService implements 
IControllerService {
         return application.getApplicationContext();
     }
 
+    public HeartbeatManager getHeartbeatManager(CcId ccId) {
+        return heartbeatManagers.get(ccId);
+    }
+
+    public NodeRegistration getNodeRegistration() {
+        return nodeRegistration;
+    }
+
     private class ProfileDumpTask extends TimerTask {
         private final IClusterController cc;
         private final CcId ccId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
index 81ceea5..cd051e3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
@@ -33,7 +33,6 @@ import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
 import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
new file mode 100644
index 0000000..0ea6399
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.heartbeat;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.nc.CcConnection;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcConnection ccc;
+    private final Thread hbThread;
+    private final CcId ccId;
+
+    private HeartbeatManager(NodeControllerService ncs, CcConnection ccc, 
HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
+        this.ncs = ncs;
+        this.ccc = ccc;
+        hbThread = new Thread(new HeartbeatTask(ncs.getId(), hbData, 
ccc.getClusterControllerService(),
+                ccc.getNodeParameters().getHeartbeatPeriod(), ncAddress), 
ncs.getId() + "-Heartbeat");
+        hbThread.setPriority(Thread.MAX_PRIORITY);
+        hbThread.setDaemon(true);
+        ccId = ccc.getCcId();
+    }
+
+    public static HeartbeatManager init(NodeControllerService ncs, 
CcConnection ccc, HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
+        HeartbeatManager hbMgr = new HeartbeatManager(ncs, ccc, hbData, 
ncAddress);
+        hbMgr.start();
+        return hbMgr;
+    }
+
+    public void shutdown() {
+        hbThread.interrupt();
+    }
+
+    public void start() {
+        hbThread.start();
+    }
+
+    public void notifyAck(HyracksDataException exception) {
+        // TODO: we should also reregister in case of no ack
+        LOGGER.debug("ack rec'd from {} w/ exception: {}", ccId::toString, () 
-> String.valueOf(exception));
+        if (exception != null && exception.matches(ErrorCode.HYRACKS, 
ErrorCode.NO_SUCH_NODE)) {
+            LOGGER.info("{} indicates it does not recognize us; force a 
reconnect", ccId);
+            try {
+                ccc.forceReregister(ncs);
+            } catch (Exception e) {
+                LOGGER.warn("ignoring exception attempting to reregister with 
{}", ccId, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
index 6d08fb1..4af7e4e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.nc.heartbeat;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -36,12 +37,15 @@ public class HeartbeatTask implements Runnable {
     private final Semaphore delayBlock = new Semaphore(0);
     private final IClusterController cc;
     private final long heartbeatPeriodNanos;
+    private final InetSocketAddress ncAddress;
 
-    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController 
cc, long heartbeatPeriod) {
+    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController 
cc, long heartbeatPeriod,
+            InetSocketAddress ncAddress) {
         this.ncId = ncId;
         this.hbData = hbData;
         this.cc = cc;
         this.heartbeatPeriodNanos = 
TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
+        this.ncAddress = ncAddress;
     }
 
     @Override
@@ -67,18 +71,15 @@ public class HeartbeatTask implements Runnable {
     private boolean execute() throws InterruptedException {
         try {
             synchronized (hbData) {
-                cc.nodeHeartbeat(ncId, hbData);
+                cc.nodeHeartbeat(ncId, hbData, ncAddress);
             }
             LOGGER.trace("Successfully sent heartbeat");
             return true;
         } catch (InterruptedException e) {
             throw e;
         } catch (Exception e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will 
retry after 1s", e);
-            } else {
-                LOGGER.log(Level.ERROR, "Exception sending heartbeat; will 
retry after 1s: " + e.toString());
-            }
+            LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will retry 
after 1s", e);
+            LOGGER.log(Level.WARN, "Exception sending heartbeat; will retry 
after 1s: " + e.toString());
             return false;
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13860fb5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
new file mode 100644
index 0000000..f43c029
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.task;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatAckTask implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+    private final HyracksDataException exception;
+
+    public HeartbeatAckTask(NodeControllerService ncs, CcId ccId, 
HyracksDataException exception) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+        this.exception = exception;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.getHeartbeatManager(ccId).notifyAck(exception);
+        } catch (Exception e) {
+            LOGGER.info("failure processing heartbeat ack from {}", ccId, e);
+        }
+    }
+}
\ No newline at end of file

Reply via email to