Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2390

Change subject: [ASTERIXDB-2284][CLUS] Ensure Node Failure on Heartbeat Miss
......................................................................

[ASTERIXDB-2284][CLUS] Ensure Node Failure on Heartbeat Miss

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Request the node which exceeded its heartbeat misses
  to shutdown to ensure its failures.
- Ensure thread safety of lastHeartbeatNanoTime in
  NodeControllerState.

Change-Id: I121f85fd858484377a9d888d18c3069c239f00fc
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
2 files changed, 22 insertions(+), 14 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/90/2390/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 06af01f..415ca81 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -141,7 +141,7 @@
 
     private int rrdPtr;
 
-    private long lastHeartbeatNanoTime;
+    private volatile long lastHeartbeatNanoTime;
 
     private NodeCapacity capacity;
 
@@ -252,10 +252,6 @@
 
     public long nanosSinceLastHeartbeat() {
         return System.nanoTime() - lastHeartbeatNanoTime;
-    }
-
-    public long getLastHeartbeatNanoTime() {
-        return lastHeartbeatNanoTime;
     }
 
     public NodeControllerRemoteProxy getNodeController() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 98cf67a..8f73864 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -48,9 +48,11 @@
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+@NotThreadSafe
 public class NodeManager implements INodeManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -99,7 +101,7 @@
         // Updates the node registry.
         if (nodeRegistry.containsKey(nodeId)) {
             LOGGER.warn("Node with name " + nodeId + " has already registered; 
failing the node then re-registering.");
-            removeDeadNode(nodeId);
+            failNonDeadNode(nodeId);
         } else {
             try {
                 // TODO(mblow): it seems we should close IPC handles when 
we're done with them (like here)
@@ -155,22 +157,23 @@
             Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
             String nodeId = entry.getKey();
             NodeControllerState state = entry.getValue();
-            if (state.nanosSinceLastHeartbeat() >= deadNodeNanosThreshold) {
+            final long nanosSinceLastHeartbeat = 
state.nanosSinceLastHeartbeat();
+            if (nanosSinceLastHeartbeat >= deadNodeNanosThreshold) {
+                ensureNodeFailure(nodeId, state);
                 deadNodes.add(nodeId);
                 affectedJobIds.addAll(state.getActiveJobIds());
-                // Removes the node from node map.
                 nodeIterator.remove();
-                // Removes the node from IP map.
                 removeNodeFromIpAddressMap(nodeId, state);
-                // Updates the cluster capacity.
                 resourceManager.update(nodeId, new NodeCapacity(0L, 0));
-                LOGGER.info(entry.getKey() + " considered dead");
+                LOGGER.info("{} considered dead. Last heartbeat received {}ms 
ago. Max miss period: {}ms", nodeId,
+                        TimeUnit.NANOSECONDS.toMillis(nanosSinceLastHeartbeat),
+                        TimeUnit.NANOSECONDS.toMillis(deadNodeNanosThreshold));
             }
         }
         return Pair.of(deadNodes, affectedJobIds);
     }
 
-    public void removeDeadNode(String nodeId) throws HyracksException {
+    private void failNonDeadNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
@@ -196,7 +199,6 @@
         nodeRegistry.forEach(nodeFunction::apply);
     }
 
-    // Removes the entry of the node in <code>ipAddressNodeNameMap</code>.
     private void removeNodeFromIpAddressMap(String nodeId, NodeControllerState 
ncState) throws HyracksException {
         InetAddress ipAddress = getIpAddress(ncState);
         Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
@@ -209,7 +211,6 @@
         }
     }
 
-    // Retrieves the IP address for a given node.
     private InetAddress getIpAddress(NodeControllerState ncState) throws 
HyracksException {
         String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
@@ -222,4 +223,15 @@
     private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
         return new NodeCapacity(nodeCapacity.getMemoryByteSize(), 
nodeCapacity.getCores() * nodeCoresMultiplier);
     }
+
+    private void ensureNodeFailure(String nodeId, NodeControllerState state) {
+        try {
+            LOGGER.info("Requesting node {} to shutdown to ensure failure", 
nodeId);
+            state.getNodeController().shutdown(false);
+            LOGGER.info("Request to shutdown failed node {} succeeded. false 
positive heartbeat miss indication",
+                    nodeId);
+        } catch (Exception ignore) {
+            LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + 
" has failed", ignore);
+        }
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2390
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I121f85fd858484377a9d888d18c3069c239f00fc
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to