This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 33eea75a950ba3b82612d938d0b892d116db3f5d Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Thu Feb 11 13:36:26 2021 +0300 [NO ISSUE][REP] Notify nodes on replica failure - user model changes: no - storage format changes: no - interface changes: yes Details: - Whenever a node fails, notify remaining active nodes that the failed node's replica has failed. Change-Id: I12551bd543cd4b664101e8f4d4f44f3124de3d54 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9944 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../app/replication/NcLifecycleCoordinator.java | 24 +++++++++- .../replication/INcLifecycleCoordinator.java | 5 +- .../messaging/ReplicaFailedMessage.java | 54 ++++++++++++++++++++++ .../asterix/runtime/utils/ClusterStateManager.java | 23 +++++++-- 4 files changed, 100 insertions(+), 6 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index 7576e0d..d22e9fc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.app.replication; +import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED; + +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -49,6 +52,7 @@ import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.replication.messaging.ReplicaFailedMessage; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.control.IGatekeeper; @@ -80,12 +84,15 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } @Override - public void notifyNodeFailure(String nodeId) throws HyracksDataException { + public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException { pendingStartupCompletionNodes.remove(nodeId); clusterManager.updateNodeState(nodeId, false, null); if (nodeId.equals(metadataNodeId)) { clusterManager.updateMetadataNode(metadataNodeId, false); } + if (replicaAddress != null) { + notifyFailedReplica(clusterManager, nodeId, replicaAddress); + } clusterManager.refreshState(); } @@ -229,4 +236,19 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { throw HyracksDataException.create(e); } } + + private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID, + InetSocketAddress replicaAddress) { + LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress); + Set<String> ncs = clusterManager.getParticipantNodes(true); + ReplicaFailedMessage message = + new ReplicaFailedMessage(replicaAddress, HyracksDataException.create(NODE_FAILED, nodeID)); + for (String nodeId : ncs) { + try { + messageBroker.sendApplicationMessageToNC(message, nodeId); + } catch (Exception e) { + LOGGER.info("failed to notify replica failure to node {}", nodeID); + } + } + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java index 1a7c3c8..9a3b125 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.replication; +import java.net.InetSocketAddress; + import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -35,9 +37,10 @@ public interface INcLifecycleCoordinator { * Defines the logic of a {@link INcLifecycleCoordinator} when a node leaves the cluster. * * @param nodeId + * @param replicaAddress * @throws HyracksDataException */ - void notifyNodeFailure(String nodeId) throws HyracksDataException; + void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException; /** * Binds the coordinator to {@code cluserManager}. diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java new file mode 100644 index 0000000..2a39eaf --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.net.InetSocketAddress; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.replication.api.ReplicationDestination; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.util.NetworkUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ReplicaFailedMessage implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LogManager.getLogger(); + private final InetSocketAddress replicaAddress; + private final Exception failure; + + public ReplicaFailedMessage(InetSocketAddress replicaAddress, Exception failure) { + this.replicaAddress = replicaAddress; + this.failure = failure; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + LOGGER.info("replica at {} failed", replicaAddress); + ReplicationDestination dest = ReplicationDestination.at(NetworkUtil.ensureUnresolved(replicaAddress)); + appCtx.getReplicationManager().notifyFailure(dest, failure); + } + + @Override + public String toString() { + return ReplicaFailedMessage.class.getSimpleName(); + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 2c05c53..bf7ca34 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.runtime.utils; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -49,6 +50,7 @@ import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -91,13 +93,13 @@ public class ClusterStateManager implements IClusterStateManager { @Override public synchronized void notifyNodeFailure(String nodeId) throws HyracksException { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Removing configuration parameters for node id " + nodeId); - } + LOGGER.info("Removing configuration parameters for node id {}", nodeId); failedNodes.add(nodeId); + // before removing the node config, get its replica location + InetSocketAddress replicaAddress = getReplicaLocation(this, nodeId); ncConfigMap.remove(nodeId); pendingRemoval.remove(nodeId); - lifecycleCoordinator.notifyNodeFailure(nodeId); + lifecycleCoordinator.notifyNodeFailure(nodeId, replicaAddress); } @Override @@ -496,4 +498,17 @@ public class ClusterStateManager implements IClusterStateManager { }); } + private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) { + final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId); + if (ncConfig == null) { + return null; + } + Object destIP = ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS); + Object destPort = ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_PORT); + if (destIP == null || destPort == null) { + return null; + } + String replicaLocation = NetworkUtil.toHostPort(String.valueOf(destIP), String.valueOf(destPort)); + return NetworkUtil.parseInetSocketAddress(replicaLocation); + } }