Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1405

Change subject: PREVIEW: Introduce Strategy Based Replication
......................................................................

PREVIEW: Introduce Strategy Based Replication

This is just a preview code to discuss the design.

Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/cluster.xml
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M 
asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
24 files changed, 1,328 insertions(+), 640 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/05/1405/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index bdce0ca..e66b62c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -30,8 +30,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -259,7 +259,7 @@
         });
         System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, 
"asterix-build-configuration.xml");
 
-        init(cleanupOnStart);
+        init(true);
         while (true) {
             Thread.sleep(10000);
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
index 02be70b..89b25d7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
@@ -67,7 +67,7 @@
         response.setContentType("application/json");
         response.setCharacterEncoding("utf-8");
         PrintWriter responseWriter = response.getWriter();
-        JSONObject json;
+        JSONObject json = null;
 
         try {
             switch (request.getPathInfo() == null ? "" : 
request.getPathInfo()) {
@@ -80,11 +80,19 @@
                 case "/summary":
                     json = getClusterStateSummaryJSON();
                     break;
+                case "/kill":
+                    
ClusterStateManager.INSTANCE.removeNCConfiguration("asterix_nc1");
+                    break;
+                case "/join":
+                    
ClusterStateManager.INSTANCE.addNCConfiguration("asterix_nc1", null);
+                    break;
                 default:
                     throw new IllegalArgumentException();
             }
             response.setStatus(HttpServletResponse.SC_OK);
-            responseWriter.write(json.toString(4));
+            if (json != null) {
+                responseWriter.write(json.toString(4));
+            }
         } catch (IllegalArgumentException e) { // NOSONAR - exception not 
logged or rethrown
             response.sendError(HttpServletResponse.SC_NOT_FOUND);
         } catch (Exception e) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b1ca062..aecf235 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -29,26 +29,26 @@
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.BuildProperties;
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.FeedProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.context.FileMapManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.FileMapManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -138,12 +138,11 @@
     private final NCExtensionManager ncExtensionManager;
 
     public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, 
List<AsterixExtension> extensions)
-            throws AsterixException, InstantiationException, 
IllegalAccessException,
-            ClassNotFoundException, IOException {
+            throws AsterixException, InstantiationException, 
IllegalAccessException, ClassNotFoundException,
+            IOException {
         List<AsterixExtension> allExtensions = new ArrayList<>();
         this.ncApplicationContext = ncApplicationContext;
-        PropertiesAccessor propertiesAccessor =
-                
PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
+        PropertiesAccessor propertiesAccessor = 
PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
         compilerProperties = new CompilerProperties(propertiesAccessor);
         externalProperties = new ExternalProperties(propertiesAccessor);
         metadataProperties = new MetadataProperties(propertiesAccessor);
@@ -180,15 +179,13 @@
 
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
-        ILocalResourceRepositoryFactory 
persistentLocalResourceRepositoryFactory =
-                new PersistentLocalResourceRepositoryFactory(
-                        ioManager, ncApplicationContext.getNodeId(), 
metadataProperties);
+        ILocalResourceRepositoryFactory 
persistentLocalResourceRepositoryFactory = new 
PersistentLocalResourceRepositoryFactory(
+                ioManager, ncApplicationContext.getNodeId(), 
metadataProperties);
 
         localResourceRepository = (PersistentLocalResourceRepository) 
persistentLocalResourceRepositoryFactory
                 .createRepository();
 
-        IAppRuntimeContextProvider asterixAppRuntimeContextProvider =
-                new AppRuntimeContextProviderForRecovery(this);
+        IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new 
AppRuntimeContextProviderForRecovery(this);
         txnSubsystem = new 
TransactionSubsystem(ncApplicationContext.getNodeId(), 
asterixAppRuntimeContextProvider,
                 txnProperties);
 
@@ -208,6 +205,7 @@
         activeManager = new ActiveManager(threadExecutor, 
ncApplicationContext.getNodeId(),
                 feedProperties.getMemoryComponentGlobalBudget(), 
compilerProperties.getFrameSize());
 
+        //TODO make it participant based instead of flag
         if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
             String nodeId = ncApplicationContext.getNodeId();
 
@@ -227,10 +225,8 @@
              * add the partitions that will be replicated in this node as 
inactive partitions
              */
             //get nodes which replicate to this node
-            Set<String> replicationClients = 
replicationProperties.getNodeReplicationClients(nodeId);
-            //remove the node itself
-            replicationClients.remove(nodeId);
-            for (String clientId : replicationClients) {
+            Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
+            for (String clientId : remoteReplicas) {
                 //get the partitions of each client
                 ClusterPartition[] clientPartitions = 
metadataProperties.getNodePartitions().get(clientId);
                 for (ClusterPartition partition : clientPartitions) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 8998c6b..7039982 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -25,18 +25,21 @@
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.MessagingProperties;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -62,13 +65,11 @@
 public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private static final Logger LOGGER = 
Logger.getLogger(NCApplicationEntryPoint.class.getName());
 
-    @Option(name = "-initial-run",
-            usage = "A flag indicating if it's the first time the NC is 
started (default: false)", required = false)
+    @Option(name = "-initial-run", usage = "A flag indicating if it's the 
first time the NC is started (default: false)", required = false)
     public boolean initialRun = false;
 
-    @Option(name = "-virtual-NC",
-            usage = "A flag indicating if this NC is running on virtual 
cluster " + "(default: false)",
-            required = false)
+    @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is 
running on virtual cluster "
+            + "(default: false)", required = false)
     public boolean virtualNC = false;
 
     private INCApplicationContext ncApplicationContext = null;
@@ -91,8 +92,8 @@
             parser.printUsage(System.err);
             throw e;
         }
-        ncAppCtx.setThreadFactory(new 
AsterixThreadFactory(ncAppCtx.getThreadFactory(),
-                ncAppCtx.getLifeCycleComponentManager()));
+        ncAppCtx.setThreadFactory(
+                new AsterixThreadFactory(ncAppCtx.getThreadFactory(), 
ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -102,12 +103,11 @@
         final NodeControllerService controllerService = 
(NodeControllerService) ncAppCtx.getControllerService();
 
         if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname", (controllerService)
-                    .getConfiguration().clusterNetPublicIPAddress);
+            System.setProperty("java.rmi.server.hostname",
+                    
(controllerService).getConfiguration().clusterNetPublicIPAddress);
         }
         runtimeContext = new NCAppRuntimeContext(ncApplicationContext, 
getExtensions());
-        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext)
-                .getMetadataProperties();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext).getMetadataProperties();
         if 
(!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) 
{
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Substitute node joining : " + 
ncApplicationContext.getNodeId());
@@ -116,8 +116,7 @@
         }
         runtimeContext.initialize(initialRun);
         ncApplicationContext.setApplicationObject(runtimeContext);
-        MessagingProperties messagingProperties = ((IPropertiesProvider) 
runtimeContext)
-                .getMessagingProperties();
+        MessagingProperties messagingProperties = ((IPropertiesProvider) 
runtimeContext).getMessagingProperties();
         messageBroker = new NCMessageBroker(controllerService, 
messagingProperties);
         ncApplicationContext.setMessageBroker(messageBroker);
         MessagingChannelInterfaceFactory interfaceFactory = new 
MessagingChannelInterfaceFactory(
@@ -169,14 +168,22 @@
     }
 
     private void startReplicationService() throws InterruptedException {
+        ReplicationProperties replicationProperties = ((IPropertiesProvider) 
runtimeContext).getReplicationProperties();
         //Open replication channel
-        runtimeContext.getReplicationChannel().start();
+        if 
(replicationProperties.getReplicationStrategy().isParticipant(nodeId)) {
+            if (nodeId.equals("asterix_nc2")) {
+                System.out.println("Started on " + nodeId + ": Remote Primary 
Replicas: "
+                        + 
replicationProperties.getReplicationStrategy().getRemotePrimaryReplicas(nodeId).stream()
+                                
.map(Replica::getId).collect(Collectors.toSet()));
+            }
+            runtimeContext.getReplicationChannel().start();
 
-        //Check the state of remote replicas
-        runtimeContext.getReplicationManager().initializeReplicasState();
+            //Check the state of remote replicas
+            runtimeContext.getReplicationManager().initializeReplicasState();
 
-        //Start replication after the state of remote replicas has been 
initialized.
-        runtimeContext.getReplicationManager().startReplicationThreads();
+            //Start replication after the state of remote replicas has been 
initialized.
+            runtimeContext.getReplicationManager().startReplicationThreads();
+        }
     }
 
     @Override
@@ -204,8 +211,7 @@
     public void notifyStartupComplete() throws Exception {
         //Send max resource id on this NC to the CC
         ReportMaxResourceIdMessage.send((NodeControllerService) 
ncApplicationContext.getControllerService());
-        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext)
-                .getMetadataProperties();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext).getMetadataProperties();
         if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
@@ -214,9 +220,8 @@
                 LOGGER.info("Root Metadata Store: " + 
metadataProperties.getStores().get(nodeId)[0]);
             }
 
-            PersistentLocalResourceRepository localResourceRepository =
-                    (PersistentLocalResourceRepository) runtimeContext
-                            .getLocalResourceRepository();
+            PersistentLocalResourceRepository localResourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
+                    .getLocalResourceRepository();
             
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
@@ -281,8 +286,7 @@
     }
 
     private void updateOnNodeJoin() {
-        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext)
-                .getMetadataProperties();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext).getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(nodeId)) {
             metadataProperties.getNodeNames().add(nodeId);
             Cluster cluster = ClusterProperties.INSTANCE.getCluster();
@@ -290,8 +294,7 @@
                 throw new IllegalStateException("No cluster configuration 
found for this instance");
             }
             String asterixInstanceName = metadataProperties.getInstanceName();
-            TransactionProperties txnProperties = ((IPropertiesProvider) 
runtimeContext)
-                    .getTransactionProperties();
+            TransactionProperties txnProperties = ((IPropertiesProvider) 
runtimeContext).getTransactionProperties();
             Node self = null;
             List<Node> nodes;
             if (cluster.getSubstituteNodes() != null) {
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml 
b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 8f0b694..3173b57 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -21,10 +21,10 @@
   <store>storage</store>
 
   <data_replication>
-    <enabled>false</enabled>
+    <enabled>true</enabled>
     <replication_port>2016</replication_port>
     <replication_factor>2</replication_factor>
-    <auto_failover>false</auto_failover>
+    <auto_failover>true</auto_failover>
     <replication_time_out>30</replication_time_out>
   </data_replication>
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
new file mode 100644
index 0000000..fb98e1d
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClusterStateManager {
+
+    /**
+     * @return The current cluster state.
+     */
+    ClusterState getState();
+
+    /**
+     * Updates the cluster state based on the state of all cluster partitions 
and the metadata node. 
+     */
+    void updateClusterState() throws HyracksDataException;
+    
+    /**
+     * Force the cluster state into {@code state} 
+     */
+    void forceIntoState(ClusterState state);
+
+    /**
+     * Updates all partitions of {@code nodeId} based on the {@code active} 
flag.
+     * @param nodeId
+     * @param active
+     * @throws HyracksDataException
+     */
+    void updateNodePartitions(String nodeId, boolean active) throws 
HyracksDataException;
+
+    /**
+     * Updates the active node and active state of the cluster partition with 
id {@code partitionNum} 
+     */
+    void updateClusterPartition(Integer partitionNum, String activeNode, 
boolean active);
+
+    /**
+     * Updates the metadata node id and its state. 
+     */
+    void updateMetadataNode(String nodeId, boolean active);
+
+    /**
+     * @return a map of nodeId and NC Configuration for active nodes.
+     */
+    Map<String, Map<String, String>> getActiveNcConfiguration();
+
+    /**
+     * @return The current metadata node Id.
+     */
+    String getCurrentMetadataNodeId();
+
+    ClusterPartition[] getNodePartitions(String nodeId);
+
+    ClusterPartition[] getClusterPartitons();
+
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 81c5a6d..85b24f6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -25,6 +25,7 @@
 import javax.xml.bind.Unmarshaller;
 
 import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.commons.lang3.StringUtils;
 
 public class ClusterProperties {
 
@@ -32,7 +33,7 @@
 
     private static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
     private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-
+    private static String NODE_NAME_PREFIX;
     private final Cluster cluster;
 
     private ClusterProperties() {
@@ -42,11 +43,13 @@
                 JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
+                NODE_NAME_PREFIX = cluster.getInstanceName() + "_";
             } catch (JAXBException e) {
                 throw new IllegalStateException("Failed to read configuration 
file " + CLUSTER_CONFIGURATION_FILE, e);
             }
         } else {
             cluster = null;
+            NODE_NAME_PREFIX = StringUtils.EMPTY;
         }
     }
 
@@ -72,4 +75,11 @@
     public boolean isAutoFailoverEnabled() {
         return isReplicationEnabled() && 
cluster.getDataReplication().isAutoFailover();
     }
+
+    public String getNodeFullName(String nodeId) {
+        if (nodeId.startsWith(NODE_NAME_PREFIX)) {
+            return nodeId;
+        }
+        return NODE_NAME_PREFIX + nodeId;
+    }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
index 164a525..78c5114 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
@@ -18,11 +18,12 @@
  */
 package org.apache.asterix.common.config;
 
-import java.util.HashSet;
 import java.util.Set;
-import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
+import 
org.apache.asterix.common.replication.ChainedDeclusteringReplicationStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -32,7 +33,6 @@
 public class ReplicationProperties extends AbstractProperties {
 
     private static final Logger LOGGER = 
Logger.getLogger(ReplicationProperties.class.getName());
-
 
     private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
 
@@ -44,8 +44,7 @@
     private static final String REPLICATION_TIMEOUT_KEY = 
"replication.timeout";
     private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
 
-    private static final String REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY =
-            "replication.max.remote.recovery.attempts";
+    private static final String REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY = 
"replication.max.remote.recovery.attempts";
     private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5;
 
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
@@ -60,20 +59,14 @@
     private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = 
StorageUtil.getSizeInBytes(128,
             StorageUnit.KILOBYTE);
 
-    private final String nodeNamePrefix;
     private final Cluster cluster;
 
     public ReplicationProperties(PropertiesAccessor accessor) {
         super(accessor);
         this.cluster = ClusterProperties.INSTANCE.getCluster();
-
-        if (cluster != null) {
-            nodeNamePrefix = cluster.getInstanceName() + "_";
-        } else {
-            nodeNamePrefix = "";
-        }
     }
 
+    //TODO remove this unused method
     @PropertyKey(REPLICATION_ENABLED_KEY)
     public boolean isReplicationEnabled() {
         return ClusterProperties.INSTANCE.isReplicationEnabled();
@@ -83,7 +76,7 @@
         if (cluster != null) {
             for (int i = 0; i < cluster.getNode().size(); i++) {
                 Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                if 
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
                     return node.getClusterIp();
                 }
             }
@@ -95,7 +88,7 @@
         if (cluster != null && cluster.getDataReplication() != null) {
             for (int i = 0; i < cluster.getNode().size(); i++) {
                 Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                if 
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
                     return node.getReplicationPort() != null ? 
node.getReplicationPort().intValue()
                             : 
cluster.getDataReplication().getReplicationPort().intValue();
                 }
@@ -105,52 +98,13 @@
     }
 
     public Set<Replica> getRemoteReplicas(String nodeId) {
-        Set<Replica> remoteReplicas = new HashSet<>();;
-
-        int numberOfRemoteReplicas = getReplicationFactor() - 1;
-        //Using chained-declustering
-        if (cluster != null) {
-            int nodeIndex = -1;
-            //find the node index in the cluster config
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
-
-            if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING,
-                        "Could not find node " + getRealCluserNodeID(nodeId) + 
" in cluster configurations");
-                return null;
-            }
-
-            //find nodes to the right of this node
-            for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
-                remoteReplicas.add(getReplicaByNodeIndex(i));
-                if (remoteReplicas.size() == numberOfRemoteReplicas) {
-                    break;
-                }
-            }
-
-            //if not all remote replicas have been found, start from the 
beginning
-            if (remoteReplicas.size() != numberOfRemoteReplicas) {
-                for (int i = 0; i < cluster.getNode().size(); i++) {
-                    remoteReplicas.add(getReplicaByNodeIndex(i));
-                    if (remoteReplicas.size() == numberOfRemoteReplicas) {
-                        break;
-                    }
-                }
-            }
-        }
-        return remoteReplicas;
+        return getReplicationStrategy().getRemoteReplicas(nodeId);
     }
 
     private Replica getReplicaByNodeIndex(int nodeIndex) {
         Node node = cluster.getNode().get(nodeIndex);
         Node replicaNode = new Node();
-        replicaNode.setId(getRealCluserNodeID(node.getId()));
+        
replicaNode.setId(ClusterProperties.INSTANCE.getNodeFullName(node.getId()));
         replicaNode.setClusterIp(node.getClusterIp());
         return new Replica(replicaNode);
     }
@@ -161,7 +115,7 @@
             for (int i = 0; i < cluster.getNode().size(); i++) {
                 Node node = cluster.getNode().get(i);
 
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                if 
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
                     nodeIndex = i;
                     break;
                 }
@@ -176,25 +130,15 @@
     }
 
     public Set<String> getRemoteReplicasIds(String nodeId) {
-        Set<String> remoteReplicasIds = new HashSet<>();
-        Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
-
-        for (Replica replica : remoteReplicas) {
-            remoteReplicasIds.add(replica.getId());
-        }
-
-        return remoteReplicasIds;
-    }
-
-    public String getRealCluserNodeID(String nodeId) {
-        return nodeNamePrefix + nodeId;
+        return 
getReplicationStrategy().getRemoteReplicas(nodeId).stream().map(Replica::getId)
+                .collect(Collectors.toSet());
     }
 
     public Set<String> getNodeReplicasIds(String nodeId) {
-        Set<String> replicaIds = new HashSet<>();
-        replicaIds.add(nodeId);
-        replicaIds.addAll(getRemoteReplicasIds(nodeId));
-        return replicaIds;
+        Set<String> remoteReplicasIds = getRemoteReplicasIds(nodeId);
+        // This includes the node itself
+        remoteReplicasIds.add(nodeId);
+        return remoteReplicasIds;
     }
 
     @PropertyKey(REPLICATION_FACTOR_KEY)
@@ -214,49 +158,6 @@
             return 
cluster.getDataReplication().getReplicationTimeOut().intValue();
         }
         return REPLICATION_TIME_OUT_DEFAULT;
-    }
-
-    /**
-     * @param nodeId
-     * @return The set of nodes which replicate to this node, including the 
node itself
-     */
-    public Set<String> getNodeReplicationClients(String nodeId) {
-        Set<String> clientReplicas = new HashSet<>();
-        clientReplicas.add(nodeId);
-
-        int clientsCount = getReplicationFactor();
-
-        //Using chained-declustering backwards
-        if (cluster != null) {
-            int nodeIndex = -1;
-            //find the node index in the cluster config
-            for (int i = 0; i < cluster.getNode().size(); i++) {
-                Node node = cluster.getNode().get(i);
-                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
-                    nodeIndex = i;
-                    break;
-                }
-            }
-
-            //find nodes to the left of this node
-            for (int i = nodeIndex - 1; i >= 0; i--) {
-                clientReplicas.add(getReplicaByNodeIndex(i).getId());
-                if (clientReplicas.size() == clientsCount) {
-                    break;
-                }
-            }
-
-            //if not all client replicas have been found, start from the end
-            if (clientReplicas.size() != clientsCount) {
-                for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
-                    clientReplicas.add(getReplicaByNodeIndex(i).getId());
-                    if (clientReplicas.size() == clientsCount) {
-                        break;
-                    }
-                }
-            }
-        }
-        return clientReplicas;
     }
 
     @PropertyKey(REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY)
@@ -281,4 +182,8 @@
         return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, 
REPLICATION_LOG_BATCH_SIZE_DEFAULT,
                 PropertyInterpreters.getIntegerBytePropertyInterpreter());
     }
+
+    public IReplicationStrategy getReplicationStrategy() {
+        return new 
ChainedDeclusteringReplicationStrategy(getReplicationFactor());
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
new file mode 100644
index 0000000..5bc4585
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public abstract class AbstractReplicationStrategy implements 
IReplicationStrategy {
+
+    public Replica getReplicaByNodeIndex(int nodeIndex) {
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        Node node = cluster.getNode().get(nodeIndex);
+        Node replicaNode = new Node();
+        
replicaNode.setId(ClusterProperties.INSTANCE.getNodeFullName(node.getId()));
+        replicaNode.setClusterIp(node.getClusterIp());
+        return new Replica(replicaNode);
+    }
+
+    @Override
+    public boolean isParticipant(String nodeId) {
+        return !getRemoteReplicas(nodeId).isEmpty() || 
!getRemotePrimaryReplicas(nodeId).isEmpty();
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
new file mode 100644
index 0000000..bd6f842
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class ChainedDeclusteringReplicationStrategy extends 
AbstractReplicationStrategy {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ChainedDeclusteringReplicationStrategy.class.getName());
+    private final int replicationFactor;
+
+    public ChainedDeclusteringReplicationStrategy(int replicationFactor) {
+        this.replicationFactor = replicationFactor;
+    }
+
+    @Override
+    public boolean isMatch(ILogRecord logRecord) {
+        return logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.WAIT;
+    }
+
+    @Override
+    public boolean isMatch(String filePath) {
+        return true;
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<>();
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        int numberOfRemoteReplicas = replicationFactor - 1;
+
+        //Using chained-declustering
+        int nodeIndex = -1;
+        String fullNodeID = ClusterProperties.INSTANCE.getNodeFullName(nodeId);
+        //find the node index in the cluster config
+        for (int i = 0; i < cluster.getNode().size(); i++) {
+            Node node = cluster.getNode().get(i);
+            if 
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(fullNodeID)) {
+                nodeIndex = i;
+                break;
+            }
+        }
+
+        if (nodeIndex == -1) {
+            LOGGER.log(Level.WARNING, "Could not find node " + fullNodeID + " 
in cluster configurations");
+            return Collections.emptySet();
+        }
+
+        //find nodes to the right of this node
+        for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
+            remoteReplicas.add(getReplicaByNodeIndex(i % 
cluster.getNode().size()));
+            if (remoteReplicas.size() == numberOfRemoteReplicas) {
+                break;
+            }
+        }
+
+        return remoteReplicas;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        Set<Replica> clientReplicas = new HashSet<>();
+        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+        final int remotePrimaryReplicasCount = replicationFactor - 1;
+
+        //Using chained-declustering backwards
+        int nodeIndex = -1;
+        //find the node index in the cluster config
+        for (int i = 0; i < cluster.getNode().size(); i++) {
+            Node node = cluster.getNode().get(i);
+            if 
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
+                nodeIndex = i;
+                break;
+            }
+        }
+
+        //find nodes to the left of this node
+        for (int i = nodeIndex - 1; i >= (-cluster.getNode().size()); i--) {
+            clientReplicas.add(getReplicaByNodeIndex(i % 
cluster.getNode().size()));
+            if (clientReplicas.size() == remotePrimaryReplicasCount) {
+                break;
+            }
+        }
+
+        return clientReplicas;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
new file mode 100644
index 0000000..1d03016
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFaultToleranceStrategy {
+
+    /**
+     * Defines the logic of a {@link IFaultToleranceStrategy} when a node 
joins the cluster.
+     * @param nodeId
+     * @throws HyracksDataException
+     */
+    void notifyNodeJoin(String nodeId) throws HyracksDataException;
+
+    /**
+     * Defines the logic of a {@link IFaultToleranceStrategy} when a node 
leaves the cluster.
+     * @param nodeId
+     * @throws HyracksDataException
+     */
+    void notifyNodeFailure(String nodeId) throws HyracksDataException;
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
new file mode 100644
index 0000000..0bafed6
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.asterix.common.transactions.ILogRecord;
+
+public interface IReplicationStrategy {
+
+    /**
+     * @param logRecord
+     * @return true, if {@code logRecord} should be replicated. Otherwise 
false.
+     */
+    boolean isMatch(ILogRecord logRecord);
+
+    /**
+     * @param file
+     * @return true, if {@code file} should be replicated. Otherwise false.
+     */
+    boolean isMatch(String file);
+
+    /**
+     * @param nodeId
+     * @return The set of nodes that replicate data on {@code nodeId}.
+     */
+    Set<Replica> getRemotePrimaryReplicas(String nodeId);
+
+    /**
+     * @param node
+     * @return The set of nodes that {@code nodeId} replicates data to.
+     */
+    Set<Replica> getRemoteReplicas(String node);
+
+    /**
+     * @param nodeId
+     * @return true if {@code nodeId} has any remote primary replica or remote 
replica. Otherwise false.
+     */
+    boolean isParticipant(String nodeId);
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
new file mode 100644
index 0000000..60c30cf
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MetadataNodeFaultToleranceStrategy implements 
IFaultToleranceStrategy {
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
new file mode 100644
index 0000000..2606f1e
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+
+public class MetadataOnlyReplicationStrategy extends 
AbstractReplicationStrategy {
+
+    private static final Logger LOGGER = 
Logger.getLogger(MetadataOnlyReplicationStrategy.class.getName());
+
+    @Override
+    public boolean isMatch(ILogRecord logRecord) {
+        boolean match = logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.WAIT;
+        if (match) {
+            switch (logRecord.getLogType()) {
+                case LogType.ENTITY_COMMIT:
+                case LogType.UPSERT_ENTITY_COMMIT:
+                case LogType.UPDATE:
+                case LogType.FLUSH:
+                    return logRecord.getDatasetId() < 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
+                default:
+                    return false;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isMatch(String filePath) {
+        return filePath.contains("/Metadata");
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<>();
+        if (nodeId.equals("asterix_nc1")) {
+            remoteReplicas.add(getReplicaByNodeIndex(1));
+        }
+        return remoteReplicas;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<>();
+        if (nodeId.equals("asterix_nc2")) {
+            remoteReplicas.add(getReplicaByNodeIndex(0));
+        }
+        return remoteReplicas;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
new file mode 100644
index 0000000..df29144
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
new file mode 100644
index 0000000..40caf3d
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.asterix.common.transactions.ILogRecord;
+
+public class NoReplicationStrategy implements IReplicationStrategy {
+
+    @Override
+    public boolean isMatch(ILogRecord logRecord) {
+        return false;
+    }
+
+    @Override
+    public boolean isMatch(String filePath) {
+        return false;
+    }
+
+    @Override
+    public boolean isParticipant(String nodeId) {
+        return false;
+    }
+
+    @Override
+    public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<Replica> getRemoteReplicas(String node) {
+        return Collections.emptySet();
+    }
+}
diff --git 
a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
 
b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
index 0f6b528..9de0f04 100644
--- 
a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
+++ 
b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -17,7 +17,7 @@
  ! under the License.
  !-->
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
-  <test-group name="failover">
+<!--   <test-group name="failover">
     <test-case FilePath="failover">
       <compilation-unit name="bulkload">
         <output-dir compare="Text">bulkload</output-dir>
@@ -33,7 +33,7 @@
         <output-dir compare="Text">metadata_node</output-dir>
       </compilation-unit>
     </test-case>
-  </test-group>
+  </test-group> -->
   <test-group name="failback">
     <test-case FilePath="failback">
       <compilation-unit name="node_failback">
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 529a660..368622c 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -44,8 +44,8 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -122,7 +122,7 @@
         Map<String, ClusterPartition[]> nodePartitions =
                 ((IPropertiesProvider) 
asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
                         .getNodePartitions();
-        Set<String> nodeReplicationClients = 
replicationProperties.getNodeReplicationClients(nodeId);
+        Set<String> nodeReplicationClients = 
replicationProperties.getNodeReplicasIds(nodeId);
         List<Integer> clientsPartitions = new ArrayList<>();
         for (String clientId : nodeReplicationClients) {
             for (ClusterPartition clusterPartition : 
nodePartitions.get(clientId)) {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index dc4a93a..59b1d60 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.PrintWriter;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -50,18 +51,20 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -103,7 +106,7 @@
     private final Map<Integer, Set<String>> jobCommitAcks;
     private final Map<Integer, ILogRecord> replicationJobsPendingAcks;
     private ByteBuffer dataBuffer;
-
+    private static final boolean DEBUG = false;
     private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ;
     private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ;
 
@@ -124,8 +127,8 @@
     private ReplicationJobsProccessor replicationJobsProcessor;
     private final ReplicasEventsMonitor replicationMonitor;
     //dummy job used to stop ReplicationJobsProccessor thread.
-    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new 
ReplicationJob(
-            ReplicationJobType.METADATA, ReplicationOperation.REPLICATE, 
ReplicationExecutionType.ASYNC, null);
+    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new 
ReplicationJob(ReplicationJobType.METADATA,
+            ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC, 
null);
     //used to identify the correct IP address when the node has multiple 
network interfaces
     private String hostIPAddressFirstOctet = null;
 
@@ -136,6 +139,7 @@
     private Future<? extends Object> txnLogReplicatorTask;
     private SocketChannel[] logsRepSockets;
     private final ByteBuffer txnLogsBatchSizeBuffer = 
ByteBuffer.allocate(Integer.BYTES);
+    private final IReplicationStrategy replicationStrategy;
 
     //TODO this class needs to be refactored by moving its private classes to 
separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -144,35 +148,36 @@
             IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
         this.nodeId = nodeId;
         this.replicationProperties = replicationProperties;
+        replicationStrategy = replicationProperties.getReplicationStrategy();
         this.replicaResourcesManager = (ReplicaResourcesManager) 
remoteResoucesManager;
         this.asterixAppRuntimeContextProvider = 
asterixAppRuntimeContextProvider;
-        this.hostIPAddressFirstOctet = 
replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
         this.logManager = logManager;
+        this.hostIPAddressFirstOctet = 
replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
+        replicas = new HashMap<>();
         replicationJobsQ = new LinkedBlockingQueue<>();
         replicaEventsQ = new LinkedBlockingQueue<>();
         terminateJobsReplication = new AtomicBoolean(false);
         jobsReplicationSuspended = new AtomicBoolean(true);
         replicationSuspended = new AtomicBoolean(true);
-        replicas = new HashMap<>();
         jobCommitAcks = new ConcurrentHashMap<>();
         replicationJobsPendingAcks = new ConcurrentHashMap<>();
         shuttingDownReplicaIds = new HashSet<>();
         dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        replicationMonitor = new ReplicasEventsMonitor();
+        //add list of replicas from configurations (To be read from another 
source e.g. Zookeeper)
+        Set<Replica> replicaNodes = 
replicationProperties.getReplicationStrategy().getRemoteReplicas(nodeId);
 
         //Used as async listeners from replicas
         replicationListenerThreads = Executors.newCachedThreadPool();
         replicationJobsProcessor = new ReplicationJobsProccessor();
-        replicationMonitor = new ReplicasEventsMonitor();
 
         Map<String, ClusterPartition[]> nodePartitions = 
((IPropertiesProvider) asterixAppRuntimeContextProvider
                 .getAppContext()).getMetadataProperties().getNodePartitions();
-        //add list of replicas from configurations (To be read from another 
source e.g. Zookeeper)
-        Set<Replica> replicaNodes = 
replicationProperties.getRemoteReplicas(nodeId);
         replica2PartitionsMap = new HashMap<>(replicaNodes.size());
         for (Replica replica : replicaNodes) {
-            replicas.put(replica.getNode().getId(), replica);
+            
replicas.put(ClusterProperties.INSTANCE.getNodeFullName(replica.getNode().getId()),
 replica);
             //for each remote replica, get the list of replication clients
-            Set<String> nodeReplicationClients = 
replicationProperties.getNodeReplicationClients(replica.getId());
+            Set<String> nodeReplicationClients = 
replicationProperties.getNodeReplicasIds(replica.getId());
             //get the partitions of each client
             List<Integer> clientPartitions = new ArrayList<>();
             for (String clientId : nodeReplicationClients) {
@@ -278,8 +283,13 @@
             //all of the job's files belong to a single storage partition.
             //get any of them to determine the partition from the file path.
             String jobFile = job.getJobFiles().iterator().next();
+            if (!replicationStrategy.isMatch(jobFile)) {
+                return;
+            }
+
             int jobPartitionId = 
PersistentLocalResourceRepository.getResourcePartition(jobFile);
 
+            // TODO apply policy here
             ByteBuffer responseBuffer = null;
             LSMIndexFileProperties asterixFileProperties = new 
LSMIndexFileProperties();
             if (requestBuffer == null) {
@@ -906,25 +916,129 @@
      */
     @Override
     public void stop(boolean dumpState, OutputStream ouputStream) throws 
IOException {
-        //stop replication thread afters all jobs/logs have been processed
-        suspendReplication(false);
-        //send shutdown event to remote replicas
-        sendShutdownNotifiction();
-        //wait until all shutdown events come from all remote replicas
-        synchronized (shuttingDownReplicaIds) {
-            while 
(!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
-                try {
-                    shuttingDownReplicaIds.wait(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
+        if (DEBUG) {
+            try (PrintWriter writer = new PrintWriter(nodeId + 
"_rep_manager.txt", "UTF-8")) {
+                writer.println("suspending Replication shutdown");
+                writer.flush();
+                //stop replication thread afters all jobs/logs have been 
processed
+                suspendReplication(false);
+
+                writer.println("Replication suspendied");
+                writer.flush();
+
+                /**
+                 * If this node has any remote replicas, it needs to inform 
them
+                 * that it is shutting down.
+                 */
+                if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
+                    writer.println(" sending shutdown to ");
+                    printSet(writer, 
replicationStrategy.getRemoteReplicas(nodeId));
+                    //send shutdown event to remote replicas
+                    sendShutdownNotifiction();
+                } else {
+                    writer.println(nodeId + " no need to send shutdown...");
+                }
+                writer.flush();
+
+                /**
+                 * If this node has any remote primary replicas, then it needs 
to wait
+                 * until all of them send the shutdown notification.
+                 */
+                // Active replicas
+                Set<String> activeReplicasIds = getActiveReplicasIds();
+                Set<String> remotePrimaryReplicasIds = 
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+                        .map(Replica::getId).collect(Collectors.toSet());
+
+                // find active remote primary replicas
+                Set<String> activeRemotePrimaryReplicas = new HashSet<>();
+                for (String replicaId : remotePrimaryReplicasIds) {
+                    if (activeReplicasIds.contains(replicaId)) {
+                        activeRemotePrimaryReplicas.add(replicaId);
+                    }
+                }
+                if (!activeRemotePrimaryReplicas.isEmpty()) {
+                    writer.println("Waiting for shutdown from: " + 
activeRemotePrimaryReplicas.toString());
+                    writer.flush();
+                    //wait until all shutdown events come from all remote 
primary replicas
+                    synchronized (shuttingDownReplicaIds) {
+                        while 
(!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
+                            try {
+                                shuttingDownReplicaIds.wait();
+                                writer.println("Current notify: " + 
shuttingDownReplicaIds.toString());
+                                writer.flush();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                            }
+                        }
+                    }
+                } else {
+                    writer.println("No need to wait for shutdown 
notification");
+                    writer.flush();
+                }
+
+                LOGGER.log(Level.INFO, "Got shutdown notification from all 
remote replicas");
+                writer.println("Got shutdown notification from all remote 
replicas");
+                //close replication channel
+                
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+                LOGGER.log(Level.INFO, "Replication manager stopped.");
+            }
+        } else {
+            //stop replication thread afters all jobs/logs have been processed
+            suspendReplication(false);
+
+            /**
+             * If this node has any remote replicas, it needs to inform them
+             * that it is shutting down.
+             */
+            if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
+                //send shutdown event to remote replicas
+                sendShutdownNotifiction();
+            }
+
+            /**
+             * If this node has any remote primary replicas, then it needs to 
wait
+             * until all of them send the shutdown notification.
+             */
+            // Active replicas
+            Set<String> activeReplicasIds = getActiveReplicasIds();
+            Set<String> remotePrimaryReplicasIds = 
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+                    .map(Replica::getId).collect(Collectors.toSet());
+
+            // find active remote primary replicas
+            Set<String> activeRemotePrimaryReplicas = new HashSet<>();
+            for (String replicaId : remotePrimaryReplicasIds) {
+                if (activeReplicasIds.contains(replicaId)) {
+                    activeRemotePrimaryReplicas.add(replicaId);
                 }
             }
-        }
-        LOGGER.log(Level.INFO, "Got shutdown notification from all remote 
replicas");
-        //close replication channel
-        
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+            if (!activeRemotePrimaryReplicas.isEmpty()) {
+                //wait until all shutdown events come from all remote primary 
replicas
+                synchronized (shuttingDownReplicaIds) {
+                    // TODO it should check for active ones only
+                    while 
(!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
+                        try {
+                            shuttingDownReplicaIds.wait();
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            }
 
-        LOGGER.log(Level.INFO, "Replication manager stopped.");
+            LOGGER.log(Level.INFO, "Got shutdown notification from all remote 
replicas");
+            //close replication channel
+            
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+            LOGGER.log(Level.INFO, "Replication manager stopped.");
+        }
+    }
+
+    private void printSet(PrintWriter writer, Set<Replica> replicas) {
+        replicas.forEach(e -> {
+            writer.println(e.getId());
+        });
+        writer.flush();
     }
 
     @Override
@@ -996,6 +1110,9 @@
     public void requestFlushLaggingReplicaIndexes(long 
nonSharpCheckpointTargetLSN) throws IOException {
         long startLSN = logManager.getAppendLSN();
         Set<String> replicaIds = getActiveReplicasIds();
+        if (replicaIds.isEmpty()) {
+            return;
+        }
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than 
nonSharpCheckpointTargetLSN.
@@ -1201,6 +1318,7 @@
         }
 
         public void handleReplicaFailure(String replicaId) throws 
InterruptedException {
+            System.out.println("failed replica: " + replicaId);
             Replica replica = replicas.get(replicaId);
 
             if (replica.getState() == ReplicaState.DEAD) {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 0dcdc7b..3dc0214 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -31,9 +31,9 @@
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -61,11 +61,11 @@
         //1. identify which replicas reside in this node
         String localNodeId = runtimeContext.getTransactionSubsystem().getId();
 
-        Set<String> nodes = 
replicationProperties.getNodeReplicationClients(localNodeId);
-
+        Set<String> nodes = 
replicationProperties.getNodeReplicasIds(localNodeId);
+        // add the node itself
         Map<String, Set<String>> recoveryCandidates = new HashMap<>();
         Map<String, Integer> candidatesScore = new HashMap<>();
-
+        
         //2. identify which nodes has backup per lost node data
         for (String node : nodes) {
             Set<String> locations = 
replicationProperties.getNodeReplicasIds(node);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
new file mode 100644
index 0000000..2e740b0
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.runtime.util.AppContextInfo;
+import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = 
Logger.getLogger(AutoFaultToleranceStrategy.class.getName());
+    private long clusterRequestId = 0;
+
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new 
LinkedList<>();
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new 
HashMap<>();
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
+    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = new HashMap<>();;
+
+    private final IClusterStateManager clusterManager;
+    private final ICCMessageBroker messageBroker;
+    private final IReplicationStrategy replicationStrategy;
+    private String currentMetadataNode;
+    private boolean metadataNodeActive;
+
+    public AutoFaultToleranceStrategy(IReplicationStrategy 
replicationStrategy, IClusterStateManager clusterManager,
+            ICCMessageBroker messageBroker) {
+        this.clusterManager = clusterManager;
+        this.messageBroker = messageBroker;
+        this.replicationStrategy = replicationStrategy;
+        currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
+        metadataNodeActive = false;
+    }
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        if (failedNodes.contains(nodeId)) {
+            prepareFailbackPlan(nodeId);
+            return;
+        }
+        //a node completed local or remote recovery and rejoined
+        failedNodes.remove(nodeId);
+
+        if (nodeId.equals(currentMetadataNode)) {
+            currentMetadataNode = nodeId;
+            metadataNodeActive = true;
+            clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
+        }
+        clusterManager.updateNodePartitions(nodeId, true);
+        validateClusterState();
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            notifyFailbackPlansNodeFailure(nodeId);
+            revertFailedFailbackPlanEffects();
+        } else {
+            //an active node failed
+            failedNodes.add(nodeId);
+            clusterManager.updateNodePartitions(nodeId, false);
+            if (nodeId.equals(currentMetadataNode)) {
+                metadataNodeActive = false;
+                clusterManager.updateMetadataNode(nodeId, metadataNodeActive);
+            }
+            validateClusterState();
+            notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+            notifyFailbackPlansNodeFailure(nodeId);
+            requestPartitionsTakeover(nodeId);
+        }
+    }
+
+    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
+        List<String> remoteReplicas = 
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId)
+                .collect(Collectors.toList());
+        String nodeIdAddress = "";
+        Map<String, Map<String, String>> activeNcConfiguration = 
clusterManager.getActiveNcConfiguration();
+        //in case the node joined with a new IP address, we need to send it to 
the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
+        for (String replica : remoteReplicas) {
+            //if the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
+                }
+            }
+        }
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back 
to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
+        if (!lostPartitions.isEmpty()) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
+                        break;
+                    }
+                }
+            }
+
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            //For each replica, send a request to takeover the assigned 
partitions
+            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we 
tried to send the request to
+                     * has failed. When the failure notification arrives, we 
will send any pending request
+                     * that belongs to the failed NC to a different active 
replica.
+                     */
+                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
+                }
+            }
+        }
+    }
+
+    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        Map<String, Map<String, String>> activeNcConfiguration = 
clusterManager.getActiveNcConfiguration();
+        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicasIds(failingBackNodeId);
+        clusterManager.getClusterPartitons();
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = 
clusterManager.getNodePartitions(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed 
back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
+    }
+
+    private synchronized void processPendingFailbackPlans() {
+        ClusterState state = clusterManager.getState();
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
+                        //partition expected to be returned to the failing 
back node
+                        clusterManager.updateClusterPartition(partitionId, 
failbackNode, false);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback 
completes
+                     */
+                    String originalMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                        clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
+                    }
+
+                    //force new jobs to wait
+                    // TODO update state to rebalancing
+                    clusterManager.forceIntoState(ClusterState.REBALANCING);
+                    handleFailbackRequests(plan, messageBroker);
+                    /**
+                     * wait until the current plan is completed before 
processing the next plan.
+                     * when the current one completes or is reverted, the 
cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
+                     */
+                    break;
+                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing 
to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
+    public synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        ClusterPartition[] clusterPartitons = 
clusterManager.getClusterPartitons();
+        Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>();
+        for (ClusterPartition partition : clusterPartitons) {
+            clusterPartitionsMap.put(partition.getPartitionId(), partition);
+        }
+        for (ClusterPartition partition : clusterPartitons) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request this node was supposed to 
handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitionsMap.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getRequestId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+        return nodePartitions;
+    }
+
+    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
+            throws HyracksDataException {
+        for (Integer partitonId : response.getPartitions()) {
+            clusterManager.updateClusterPartition(partitonId, 
response.getNodeId(), true);
+        }
+        pendingTakeoverRequests.remove(response.getRequestId());
+        validateClusterState();
+    }
+
+    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
response)
+            throws HyracksDataException {
+        currentMetadataNode = response.getNodeId();
+        metadataNodeActive = true;
+        clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
+        validateClusterState();
+    }
+
+    private void validateClusterState() throws HyracksDataException {
+        clusterManager.updateClusterState();
+        ClusterState newState = clusterManager.getState();
+        // PENDING: all partitions are active not metadata node is not
+        if (newState == ClusterState.PENDING) {
+            requestMetadataNodeTakeover();
+        } else if (newState == ClusterState.ACTIVE) {
+            processPendingFailbackPlans();
+        }
+    }
+
+    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back 
node
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
+            throws HyracksDataException {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+        clusterManager.updateNodePartitions(nodeId, true);
+        validateClusterState();
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition();
+        //request the metadataPartition node to register itself as the 
metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to 
send the request to
+             * has failed. When the failure notification arrives, a new NC 
will be assigned to
+             * the metadata partition and a new metadata node takeover request 
will be sent to it.
+             */
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
index 3ba1965..4d8006c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -19,13 +19,11 @@
 package org.apache.asterix.runtime.util;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.logging.Level;
@@ -33,24 +31,19 @@
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
 import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
 import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
 import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
 import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.runtime.replication.AutoFaultToleranceStrategy;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.json.JSONException;
@@ -60,7 +53,7 @@
  * A holder class for properties related to the Asterix cluster.
  */
 
-public class ClusterStateManager {
+public class ClusterStateManager implements IClusterStateManager {
     /*
      * TODO: currently after instance restarts we require all nodes to join 
again,
      * otherwise the cluster wont be ACTIVE. we may overcome this by storing 
the cluster state before the instance
@@ -69,9 +62,9 @@
 
     private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
     public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
-    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
     private static final String IO_DEVICES = "iodevices";
-    private Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
+    private final Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
+    private Map<String, Map<String, String>> activeNcConfigurationBackup = new 
HashMap<>();
 
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
@@ -82,32 +75,25 @@
 
     private Map<String, ClusterPartition[]> node2PartitionsMap = null;
     private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
-    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = null;
 
-    private long clusterRequestId = 0;
     private String currentMetadataNode = null;
     private boolean metadataNodeActive = false;
-    private boolean autoFailover = false;
-    private boolean replicationEnabled = false;
     private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+    private IReplicationStrategy replicationStrategy;
+    private ICCMessageBroker messageBroker;
+    private IFaultToleranceStrategy ftStrategy;
 
     private ClusterStateManager() {
         cluster = ClusterProperties.INSTANCE.getCluster();
         // if this is the CC process
-        if (AppContextInfo.INSTANCE.initialized()
-                && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+        if (AppContextInfo.INSTANCE.initialized() && 
AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
             node2PartitionsMap = 
AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
             clusterPartitions = 
AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
             currentMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
-            replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
-            autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
-            if (autoFailover) {
-                pendingTakeoverRequests = new HashMap<>();
-                pendingProcessingFailbackPlans = new LinkedList<>();
-                planId2FailbackPlanMap = new HashMap<>();
-            }
+            replicationStrategy = 
AppContextInfo.INSTANCE.getReplicationProperties().getReplicationStrategy();
+            messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+            // TODO get from config or factory
+            ftStrategy = new AutoFaultToleranceStrategy(replicationStrategy, 
this, messageBroker);
         }
     }
 
@@ -115,30 +101,8 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
         }
-        activeNcConfiguration.remove(nodeId);
-
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                notifyFailbackPlansNodeFailure(nodeId);
-                revertFailedFailbackPlanEffects();
-            }
-        } else {
-            //an active node failed
-            failedNodes.add(nodeId);
-            if (nodeId.equals(currentMetadataNode)) {
-                metadataNodeActive = false;
-                LOGGER.info("Metadata node is now inactive");
-            }
-            updateNodePartitions(nodeId, false);
-            if (replicationEnabled) {
-                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
-                if (autoFailover) {
-                    notifyFailbackPlansNodeFailure(nodeId);
-                    requestPartitionsTakeover(nodeId);
-                }
-            }
-        }
+        activeNcConfigurationBackup.put(nodeId, 
activeNcConfiguration.remove(nodeId));
+        ftStrategy.notifyNodeFailure(nodeId);
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration)
@@ -146,47 +110,53 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
         }
+        if (configuration == null) {
+            configuration = activeNcConfigurationBackup.get(nodeId);
+        }
         activeNcConfiguration.put(nodeId, configuration);
-
-        //a node trying to come back after failure
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                prepareFailbackPlan(nodeId);
-                return;
-            } else {
-                //a node completed local or remote recovery and rejoined
-                failedNodes.remove(nodeId);
-                if (replicationEnabled) {
-                    //notify other replica to reconnect to this node
-                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-                }
-            }
-        }
-
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = true;
-            LOGGER.info("Metadata node is now active");
-        }
-        updateNodePartitions(nodeId, true);
+        ftStrategy.notifyNodeJoin(nodeId);
     }
 
-    private synchronized void updateNodePartitions(String nodeId, boolean 
added) throws HyracksDataException {
+    @Override
+    public synchronized void forceIntoState(ClusterState state) {
+        this.state = state;
+    }
+
+    @Override
+    public void updateMetadataNode(String nodeId, boolean active) {
+        currentMetadataNode = nodeId;
+        metadataNodeActive = active;
+        if (active) {
+            LOGGER.info(String.format("Metadata node %s is now active", 
currentMetadataNode));
+        }
+    }
+
+    @Override
+    public synchronized void updateNodePartitions(String nodeId, boolean 
active) throws HyracksDataException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         // if this isn't a storage node, it will not have cluster partitions
         if (nodePartitions != null) {
             for (ClusterPartition p : nodePartitions) {
-                // set the active node for this node's partitions
-                p.setActive(added);
-                if (added) {
-                    p.setActiveNodeId(nodeId);
-                }
+                updateClusterPartition(p.getPartitionId(), nodeId, active);
             }
-            resetClusterPartitionConstraint();
-            updateClusterState();
         }
     }
 
-    private synchronized void updateClusterState() throws HyracksDataException 
{
+    @Override
+    public synchronized void updateClusterPartition(Integer partitionNum, 
String activeNode, boolean active) {
+        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionNum);
+        if (clusterPartition != null) {
+            // set the active node for this node's partitions
+            clusterPartition.setActive(active);
+            if (active) {
+                clusterPartition.setActiveNodeId(activeNode);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void updateClusterState() throws HyracksDataException {
+        resetClusterPartitionConstraint();
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 state = ClusterState.UNUSABLE;
@@ -194,20 +164,17 @@
                 return;
             }
         }
+
+        state = ClusterState.PENDING;
+        LOGGER.info("Cluster is now " + state);
+
         // if all storage partitions are active as well as the metadata node, 
then the cluster is active
         if (metadataNodeActive) {
-            state = ClusterState.PENDING;
-            LOGGER.info("Cluster is now " + state);
             AppContextInfo.INSTANCE.getMetadataBootstrap().init();
             state = ClusterState.ACTIVE;
             LOGGER.info("Cluster is now " + state);
             // start global recovery
             
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
-                processPendingFailbackPlans();
-            }
-        } else {
-            requestMetadataNodeTakeover();
         }
     }
 
@@ -230,6 +197,7 @@
         return ncConfig.get(IO_DEVICES).split(",");
     }
 
+    @Override
     public ClusterState getState() {
         return state;
     }
@@ -285,6 +253,7 @@
         return 
AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
     }
 
+    @Override
     public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
         return node2PartitionsMap.get(nodeId);
     }
@@ -296,337 +265,13 @@
         return 0;
     }
 
+    @Override
     public synchronized ClusterPartition[] getClusterPartitons() {
         ArrayList<ClusterPartition> partitons = new ArrayList<>();
         for (ClusterPartition partition : clusterPartitions.values()) {
             partitons.add(partition);
         }
         return partitons.toArray(new ClusterPartition[] {});
-    }
-
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
-        if (!lostPartitions.isEmpty()) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
-                        break;
-                    }
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            //For each replica, send a request to takeover the assigned 
partitions
-            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
-                String replica = entry.getKey();
-                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
-                } catch (Exception e) {
-                    /**
-                     * if we fail to send the request, it means the NC we 
tried to send the request to
-                     * has failed. When the failure notification arrives, we 
will send any pending request
-                     * that belongs to the failed NC to a different active 
replica.
-                     */
-                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
-                }
-            }
-        }
-    }
-
-    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
-            Map<String, List<Integer>> partitionRecoveryPlan) {
-        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
-            if (!partitionRecoveryPlan.containsKey(replica)) {
-                List<Integer> replicaPartitions = new ArrayList<>();
-                replicaPartitions.add(partition.getPartitionId());
-                partitionRecoveryPlan.put(replica, replicaPartitions);
-            } else {
-                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            if (partition.getActiveNodeId().equals(nodeId)) {
-                nodePartitions.add(partition);
-            }
-        }
-        /**
-         * if there is any pending takeover request that this node was 
supposed to handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitions.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getRequestId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = 
AppContextInfo.INSTANCE.getMetadataProperties()
-                .getMetadataPartition();
-        //request the metadataPartition node to register itself as the 
metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
-        } catch (Exception e) {
-            /**
-             * if we fail to send the request, it means the NC we tried to 
send the request to
-             * has failed. When the failure notification arrives, a new NC 
will be assigned to
-             * the metadata partition and a new metadata node takeover request 
will be sent to it.
-             */
-            LOGGER.log(Level.WARNING,
-                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
-        }
-    }
-
-    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
-            throws HyracksDataException {
-        for (Integer partitonId : response.getPartitions()) {
-            ClusterPartition partition = clusterPartitions.get(partitonId);
-            partition.setActive(true);
-            partition.setActiveNodeId(response.getNodeId());
-        }
-        pendingTakeoverRequests.remove(response.getRequestId());
-        resetClusterPartitionConstraint();
-        updateClusterState();
-    }
-
-    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
response)
-            throws HyracksDataException {
-        currentMetadataNode = response.getNodeId();
-        metadataNodeActive = true;
-        LOGGER.info("Current metadata node: " + currentMetadataNode);
-        updateClusterState();
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicationClients(failingBackNodeId);
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = 
node2PartitionsMap.get(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /**
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed 
back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        /**
-         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /**
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
-                        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionId);
-                        clusterPartition.setActive(false);
-                        //partition expected to be returned to the failing 
back node
-                        clusterPartition.setActiveNodeId(failbackNode);
-                    }
-
-                    /**
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback 
completes
-                     */
-                    String originalMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties()
-                            .getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                    }
-
-                    //force new jobs to wait
-                    state = ClusterState.REBALANCING;
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE
-                            .getCCApplicationContext().getMessageBroker();
-                    handleFailbackRequests(plan, messageBroker);
-                    /**
-                     * wait until the current plan is completed before 
processing the next plan.
-                     * when the current one completes or is reverted, the 
cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
-                     */
-                    break;
-                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing 
to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
-        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
-        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
-                plan.addPendingRequest(request);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
-                plan.notifyNodeFailure(request.getNodeID());
-                revertFailedFailbackPlanEffects();
-                break;
-            }
-        }
-    }
-
-    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /**
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back 
node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
-            throws HyracksDataException {
-        /**
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
-        String nodeIdAddress = "";
-        //in case the node joined with a new IP address, we need to send it to 
the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
-        }
-
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        for (String replica : remoteReplicas) {
-            //if the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
-                }
-            }
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back 
to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            plan.notifyNodeFailure(nodeId);
-        }
     }
 
     public synchronized boolean isMetadataNodeActive() {
@@ -654,9 +299,7 @@
                 }
             }
             nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? 
"FAILED"
-                    : allActive ? "ACTIVE"
-                    : anyActive ? "PARTIALLY_ACTIVE"
-                    : "INACTIVE");
+                    : allActive ? "ACTIVE" : anyActive ? "PARTIALLY_ACTIVE" : 
"INACTIVE");
             nodeJSON.put("partitions", partitions);
             stateDescription.accumulate("ncs", nodeJSON);
         }
@@ -670,4 +313,35 @@
         stateDescription.put("partitions", clusterPartitions);
         return stateDescription;
     }
+
+    @Override
+    public Map<String, Map<String, String>> getActiveNcConfiguration() {
+        return Collections.unmodifiableMap(activeNcConfiguration);
+    }
+
+    @Override
+    public String getCurrentMetadataNodeId() {
+        return currentMetadataNode;
+    }
+
+    //TODO messaging should be handled by FaultToleranceManager
+    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage msg)
+            throws HyracksDataException {
+        ((AutoFaultToleranceStrategy) 
ftStrategy).processCompleteFailbackResponse(msg);
+    }
+
+    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
+        ((AutoFaultToleranceStrategy) 
ftStrategy).processPreparePartitionsFailbackResponse(msg);
+
+    }
+
+    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage msg)
+            throws HyracksDataException {
+        ((AutoFaultToleranceStrategy) 
ftStrategy).processMetadataNodeTakeoverResponse(msg);
+    }
+
+    public void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage msg) throws 
HyracksDataException {
+        ((AutoFaultToleranceStrategy) 
ftStrategy).processPartitionTakeoverResponse(msg);
+    }
+
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index c81225f..03a3364 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -30,15 +31,16 @@
 public class LogManagerWithReplication extends LogManager {
 
     private IReplicationManager replicationManager;
+    private final IReplicationStrategy replicationStrategy;
 
-    public LogManagerWithReplication(TransactionSubsystem txnSubsystem) {
+    public LogManagerWithReplication(TransactionSubsystem txnSubsystem, 
IReplicationStrategy replicationPolicy) {
         super(txnSubsystem);
+        this.replicationStrategy = replicationPolicy;
     }
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        //only locally generated logs should be replicated
-        logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.WAIT);
+        logRecord.setReplicated(replicationStrategy.isMatch(logRecord));
 
         //Remote flush logs do not need to be flushed separately since they 
may not trigger local flush
         if (logRecord.getLogType() == LogType.FLUSH && 
logRecord.getLogSource() == LogSource.LOCAL) {
@@ -74,7 +76,8 @@
                     }
 
                     //wait for job Commit/Abort ACK from replicas
-                    if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT) {
+                    if (logRecord.isReplicated() && (logRecord.getLogType() == 
LogType.JOB_COMMIT
+                            || logRecord.getLogType() == LogType.ABORT)) {
                         while 
(!replicationManager.hasBeenReplicated(logRecord)) {
                             try {
                                 logRecord.wait();
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 09183fe..a3060e0 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,11 +22,11 @@
 import java.util.concurrent.Future;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -70,7 +70,11 @@
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
         this.lockManager = new 
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
-        final boolean replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
+        ReplicationProperties asterixReplicationProperties = 
((IPropertiesProvider) asterixAppRuntimeContextProvider
+                .getAppContext()).getReplicationProperties();
+        IReplicationStrategy replicationStrategy = 
asterixReplicationProperties.getReplicationStrategy();
+        final boolean replicationEnabled = 
asterixReplicationProperties.getReplicationStrategy().isParticipant(id);
+
         final CheckpointProperties checkpointProperties = new 
CheckpointProperties(txnProperties, id);
         checkpointManager = CheckpointManagerFactory.create(this, 
checkpointProperties, replicationEnabled);
         final Checkpoint latestCheckpoint = checkpointManager.getLatest();
@@ -80,14 +84,8 @@
                             latestCheckpoint.getStorageVersion(), 
StorageConstants.VERSION));
         }
 
-        ReplicationProperties asterixReplicationProperties = null;
-        if (asterixAppRuntimeContextProvider != null) {
-            asterixReplicationProperties = ((IPropertiesProvider) 
asterixAppRuntimeContextProvider
-                    .getAppContext()).getReplicationProperties();
-        }
-
-        if (asterixReplicationProperties != null && replicationEnabled) {
-            this.logManager = new LogManagerWithReplication(this);
+        if (replicationEnabled) {
+            this.logManager = new LogManagerWithReplication(this, 
replicationStrategy);
         } else {
             this.logManager = new LogManager(this);
         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1405
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to