This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 33eea75a950ba3b82612d938d0b892d116db3f5d
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Thu Feb 11 13:36:26 2021 +0300

    [NO ISSUE][REP] Notify nodes on replica failure
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Whenever a node fails, notify remaining active nodes
      that the failed node's replica has failed.
    
    Change-Id: I12551bd543cd4b664101e8f4d4f44f3124de3d54
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9944
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../app/replication/NcLifecycleCoordinator.java    | 24 +++++++++-
 .../replication/INcLifecycleCoordinator.java       |  5 +-
 .../messaging/ReplicaFailedMessage.java            | 54 ++++++++++++++++++++++
 .../asterix/runtime/utils/ClusterStateManager.java | 23 +++++++--
 4 files changed, 100 insertions(+), 6 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 7576e0d..d22e9fc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.app.replication;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED;
+
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +52,7 @@ import 
org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.replication.messaging.ReplicaFailedMessage;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.control.IGatekeeper;
@@ -80,12 +84,15 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
     }
 
     @Override
-    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+    public void notifyNodeFailure(String nodeId, InetSocketAddress 
replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
         clusterManager.updateNodeState(nodeId, false, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
+        if (replicaAddress != null) {
+            notifyFailedReplica(clusterManager, nodeId, replicaAddress);
+        }
         clusterManager.refreshState();
     }
 
@@ -229,4 +236,19 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
             throw HyracksDataException.create(e);
         }
     }
+
+    private void notifyFailedReplica(IClusterStateManager clusterManager, 
String nodeID,
+            InetSocketAddress replicaAddress) {
+        LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, 
replicaAddress);
+        Set<String> ncs = clusterManager.getParticipantNodes(true);
+        ReplicaFailedMessage message =
+                new ReplicaFailedMessage(replicaAddress, 
HyracksDataException.create(NODE_FAILED, nodeID));
+        for (String nodeId : ncs) {
+            try {
+                messageBroker.sendApplicationMessageToNC(message, nodeId);
+            } catch (Exception e) {
+                LOGGER.info("failed to notify replica failure to node {}", 
nodeID);
+            }
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
index 1a7c3c8..9a3b125 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.net.InetSocketAddress;
+
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -35,9 +37,10 @@ public interface INcLifecycleCoordinator {
      * Defines the logic of a {@link INcLifecycleCoordinator} when a node 
leaves the cluster.
      *
      * @param nodeId
+     * @param replicaAddress
      * @throws HyracksDataException
      */
-    void notifyNodeFailure(String nodeId) throws HyracksDataException;
+    void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) 
throws HyracksDataException;
 
     /**
      * Binds the coordinator to {@code cluserManager}.
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
new file mode 100644
index 0000000..2a39eaf
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.net.InetSocketAddress;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.NetworkUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaFailedMessage implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final InetSocketAddress replicaAddress;
+    private final Exception failure;
+
+    public ReplicaFailedMessage(InetSocketAddress replicaAddress, Exception 
failure) {
+        this.replicaAddress = replicaAddress;
+        this.failure = failure;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        LOGGER.info("replica at {} failed", replicaAddress);
+        ReplicationDestination dest = 
ReplicationDestination.at(NetworkUtil.ensureUnresolved(replicaAddress));
+        appCtx.getReplicationManager().notifyFailure(dest, failure);
+    }
+
+    @Override
+    public String toString() {
+        return ReplicaFailedMessage.class.getSimpleName();
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 2c05c53..bf7ca34 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.utils;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +50,7 @@ import org.apache.hyracks.control.cc.ClusterControllerService;
 import 
org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -91,13 +93,13 @@ public class ClusterStateManager implements 
IClusterStateManager {
 
     @Override
     public synchronized void notifyNodeFailure(String nodeId) throws 
HyracksException {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
-        }
+        LOGGER.info("Removing configuration parameters for node id {}", 
nodeId);
         failedNodes.add(nodeId);
+        // before removing the node config, get its replica location
+        InetSocketAddress replicaAddress = getReplicaLocation(this, nodeId);
         ncConfigMap.remove(nodeId);
         pendingRemoval.remove(nodeId);
-        lifecycleCoordinator.notifyNodeFailure(nodeId);
+        lifecycleCoordinator.notifyNodeFailure(nodeId, replicaAddress);
     }
 
     @Override
@@ -496,4 +498,17 @@ public class ClusterStateManager implements 
IClusterStateManager {
         });
     }
 
+    private static InetSocketAddress getReplicaLocation(IClusterStateManager 
csm, String nodeId) {
+        final Map<IOption, Object> ncConfig = 
csm.getActiveNcConfiguration().get(nodeId);
+        if (ncConfig == null) {
+            return null;
+        }
+        Object destIP = 
ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS);
+        Object destPort = 
ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_PORT);
+        if (destIP == null || destPort == null) {
+            return null;
+        }
+        String replicaLocation = 
NetworkUtil.toHostPort(String.valueOf(destIP), String.valueOf(destPort));
+        return NetworkUtil.parseInetSocketAddress(replicaLocation);
+    }
 }

Reply via email to