Repository: asterixdb
Updated Branches:
  refs/heads/master f5efea594 -> 1d5412dc5


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d5412dc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
index 9fdeebb..471d3d3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -103,8 +103,7 @@ public class AsterixAppContextInfo implements 
IAsterixApplicationContextInfo, IA
         INSTANCE.txnProperties = new 
AsterixTransactionProperties(propertiesAccessor);
         INSTANCE.feedProperties = new 
AsterixFeedProperties(propertiesAccessor);
         INSTANCE.extensionProperties = new 
AsterixExtensionProperties(propertiesAccessor);
-        INSTANCE.replicationProperties = new 
AsterixReplicationProperties(propertiesAccessor,
-                AsterixClusterProperties.INSTANCE.getCluster());
+        INSTANCE.replicationProperties = new 
AsterixReplicationProperties(propertiesAccessor);
         INSTANCE.hcc = hcc;
         INSTANCE.buildProperties = new 
AsterixBuildProperties(propertiesAccessor);
         INSTANCE.messagingProperties = new 
MessagingProperties(propertiesAccessor);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d5412dc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
deleted file mode 100644
index d9a55ad..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
+++ /dev/null
@@ -1,685 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.util;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.event.schema.cluster.Cluster;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * A holder class for properties related to the Asterix cluster.
- */
-
-public class AsterixClusterProperties {
-    /*
-     * TODO: currently after instance restarts we require all nodes to join 
again,
-     * otherwise the cluster wont be ACTIVE. we may overcome this by storing 
the cluster state before the instance
-     * shutdown and using it on startup to identify the nodes that are 
expected the join.
-     */
-
-    private static final Logger LOGGER = 
Logger.getLogger(AsterixClusterProperties.class.getName());
-    public static final AsterixClusterProperties INSTANCE = new 
AsterixClusterProperties();
-    public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
-
-    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
-    private static final String IO_DEVICES = "iodevices";
-    private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-    private Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
-
-    private final Cluster cluster;
-    private ClusterState state = ClusterState.UNUSABLE;
-
-    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
-
-    private boolean globalRecoveryCompleted = false;
-
-    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
-    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
-    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = null;
-
-    private long clusterRequestId = 0;
-    private String currentMetadataNode = null;
-    private boolean metadataNodeActive = false;
-    private boolean autoFailover = false;
-    private boolean replicationEnabled = false;
-    private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
-
-    private AsterixClusterProperties() {
-        InputStream is = 
this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
-        if (is != null) {
-            try {
-                JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
-                Unmarshaller unmarshaller = ctx.createUnmarshaller();
-                cluster = (Cluster) unmarshaller.unmarshal(is);
-            } catch (JAXBException e) {
-                throw new IllegalStateException("Failed to read configuration 
file " + CLUSTER_CONFIGURATION_FILE, e);
-            }
-        } else {
-            cluster = null;
-        }
-        // if this is the CC process
-        if (AsterixAppContextInfo.INSTANCE.initialized()
-                && AsterixAppContextInfo.INSTANCE.getCCApplicationContext() != 
null) {
-            node2PartitionsMap = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
-            clusterPartitions = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
-            currentMetadataNode = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
-            replicationEnabled = isReplicationEnabled();
-            autoFailover = isAutoFailoverEnabled();
-            if (autoFailover) {
-                pendingTakeoverRequests = new HashMap<>();
-                pendingProcessingFailbackPlans = new LinkedList<>();
-                planId2FailbackPlanMap = new HashMap<>();
-            }
-        }
-    }
-
-    public synchronized void removeNCConfiguration(String nodeId) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
-        }
-        activeNcConfiguration.remove(nodeId);
-
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                notifyFailbackPlansNodeFailure(nodeId);
-                revertFailedFailbackPlanEffects();
-            }
-        } else {
-            //an active node failed
-            failedNodes.add(nodeId);
-            if (nodeId.equals(currentMetadataNode)) {
-                metadataNodeActive = false;
-                LOGGER.info("Metadata node is now inactive");
-            }
-            updateNodePartitions(nodeId, false);
-            if (replicationEnabled) {
-                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
-                if (autoFailover) {
-                    notifyFailbackPlansNodeFailure(nodeId);
-                    requestPartitionsTakeover(nodeId);
-                }
-            }
-        }
-    }
-
-    public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
-        }
-        activeNcConfiguration.put(nodeId, configuration);
-
-        //a node trying to come back after failure
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                prepareFailbackPlan(nodeId);
-                return;
-            } else {
-                //a node completed local or remote recovery and rejoined
-                failedNodes.remove(nodeId);
-                if (replicationEnabled) {
-                    //notify other replica to reconnect to this node
-                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-                }
-            }
-        }
-
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = true;
-            LOGGER.info("Metadata node is now active");
-        }
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void updateNodePartitions(String nodeId, boolean 
added) {
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
-        if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                // set the active node for this node's partitions
-                p.setActive(added);
-                if (added) {
-                    p.setActiveNodeId(nodeId);
-                }
-            }
-            resetClusterPartitionConstraint();
-            updateClusterState();
-        }
-    }
-
-    private synchronized void updateClusterState() {
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (!p.isActive()) {
-                state = ClusterState.UNUSABLE;
-                LOGGER.info("Cluster is in UNUSABLE state");
-                return;
-            }
-        }
-        //if all storage partitions are active as well as the metadata node, 
then the cluster is active
-        if (metadataNodeActive) {
-            state = ClusterState.ACTIVE;
-            LOGGER.info("Cluster is now ACTIVE");
-            //start global recovery
-            
AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
-                processPendingFailbackPlans();
-            }
-        } else {
-            requestMetadataNodeTakeover();
-        }
-    }
-
-    /**
-     * Returns the IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return a list of IO devices.
-     */
-    public synchronized String[] getIODevices(String nodeId) {
-        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
-        if (ncConfig == null) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Configuration parameters for nodeId " + nodeId
-                        + " not found. The node has not joined yet or has 
left.");
-            }
-            return new String[0];
-        }
-        return ncConfig.get(IO_DEVICES).split(",");
-    }
-
-    public ClusterState getState() {
-        return state;
-    }
-
-    public Cluster getCluster() {
-        return cluster;
-    }
-
-    public synchronized Node getAvailableSubstitutionNode() {
-        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : 
cluster.getSubstituteNodes().getNode();
-        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
-    }
-
-    public synchronized Set<String> getParticipantNodes() {
-        Set<String> participantNodes = new HashSet<>();
-        for (String pNode : activeNcConfiguration.keySet()) {
-            participantNodes.add(pNode);
-        }
-        return participantNodes;
-    }
-
-    public synchronized AlgebricksAbsolutePartitionConstraint 
getClusterLocations() {
-        if (clusterPartitionConstraint == null) {
-            resetClusterPartitionConstraint();
-        }
-        return clusterPartitionConstraint;
-    }
-
-    private synchronized void resetClusterPartitionConstraint() {
-        ArrayList<String> clusterActiveLocations = new ArrayList<>();
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (p.isActive()) {
-                clusterActiveLocations.add(p.getActiveNodeId());
-            }
-        }
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
-                clusterActiveLocations.toArray(new String[] {}));
-    }
-
-    public boolean isGlobalRecoveryCompleted() {
-        return globalRecoveryCompleted;
-    }
-
-    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
-        this.globalRecoveryCompleted = globalRecoveryCompleted;
-    }
-
-    public boolean isClusterActive() {
-        if (cluster == null) {
-            // this is a virtual cluster
-            return true;
-        }
-        return state == ClusterState.ACTIVE;
-    }
-
-    public static int getNumberOfNodes() {
-        return 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
-    }
-
-    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
-        return node2PartitionsMap.get(nodeId);
-    }
-
-    public synchronized int getNodePartitionsCount(String node) {
-        if (node2PartitionsMap.containsKey(node)) {
-            return node2PartitionsMap.get(node).length;
-        }
-        return 0;
-    }
-
-    public synchronized ClusterPartition[] getClusterPartitons() {
-        ArrayList<ClusterPartition> partitons = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            partitons.add(partition);
-        }
-        return partitons.toArray(new ClusterPartition[] {});
-    }
-
-    public String getStorageDirectoryName() {
-        if (cluster != null) {
-            return cluster.getStore();
-        }
-        // virtual cluster without cluster config file
-        return DEFAULT_STORAGE_DIR_NAME;
-    }
-
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        AsterixReplicationProperties replicationProperties = 
AsterixAppContextInfo.INSTANCE
-                .getReplicationProperties();
-
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
-        if (!lostPartitions.isEmpty()) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    addActiveReplica(replica, partition, 
partitionRecoveryPlan);
-                    // bug? will always break on first loop execution
-                    break;
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE
-                    .getCCApplicationContext().getMessageBroker();
-            //For each replica, send a request to takeover the assigned 
partitions
-            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
-                String replica = entry.getKey();
-                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
-                } catch (Exception e) {
-                    /**
-                     * if we fail to send the request, it means the NC we 
tried to send the request to
-                     * has failed. When the failure notification arrives, we 
will send any pending request
-                     * that belongs to the failed NC to a different active 
replica.
-                     */
-                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
-                }
-            }
-        }
-    }
-
-    private void addActiveReplica(String replica, ClusterPartition partition,
-            Map<String, List<Integer>> partitionRecoveryPlan) {
-        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
-            if (!partitionRecoveryPlan.containsKey(replica)) {
-                List<Integer> replicaPartitions = new ArrayList<>();
-                replicaPartitions.add(partition.getPartitionId());
-                partitionRecoveryPlan.put(replica, replicaPartitions);
-            } else {
-                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-            }
-        }
-    }
-
-    private synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            if (partition.getActiveNodeId().equals(nodeId)) {
-                nodePartitions.add(partition);
-            }
-        }
-        /**
-         * if there is any pending takeover request that this node was 
supposed to handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitions.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getRequestId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties()
-                .getMetadataPartition();
-        //request the metadataPartition node to register itself as the 
metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE
-                .getCCApplicationContext().getMessageBroker();
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
-        } catch (Exception e) {
-            /**
-             * if we fail to send the request, it means the NC we tried to 
send the request to
-             * has failed. When the failure notification arrives, a new NC 
will be assigned to
-             * the metadata partition and a new metadata node takeover request 
will be sent to it.
-             */
-            LOGGER.log(Level.WARNING,
-                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
-        }
-    }
-
-    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
-        for (Integer partitonId : reponse.getPartitions()) {
-            ClusterPartition partition = clusterPartitions.get(partitonId);
-            partition.setActive(true);
-            partition.setActiveNodeId(reponse.getNodeId());
-        }
-        pendingTakeoverRequests.remove(reponse.getRequestId());
-        resetClusterPartitionConstraint();
-        updateClusterState();
-    }
-
-    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
reponse) {
-        currentMetadataNode = reponse.getNodeId();
-        metadataNodeActive = true;
-        LOGGER.info("Current metadata node: " + currentMetadataNode);
-        updateClusterState();
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        AsterixReplicationProperties replicationProperties = 
AsterixAppContextInfo.INSTANCE
-                .getReplicationProperties();
-        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicationClients(failingBackNodeId);
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = 
node2PartitionsMap.get(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /**
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed 
back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        /**
-         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /**
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
-                        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionId);
-                        clusterPartition.setActive(false);
-                        //partition expected to be returned to the failing 
back node
-                        clusterPartition.setActiveNodeId(failbackNode);
-                    }
-
-                    /**
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback 
completes
-                     */
-                    String originalMetadataNode = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties()
-                            .getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                    }
-
-                    //force new jobs to wait
-                    state = ClusterState.REBALANCING;
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE
-                            .getCCApplicationContext().getMessageBroker();
-                    handleFailbackRequests(plan, messageBroker);
-                    /**
-                     * wait until the current plan is completed before 
processing the next plan.
-                     * when the current one completes or is reverted, the 
cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
-                     */
-                    break;
-                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing 
to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
-        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
-        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
-                plan.addPendingRequest(request);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
-                plan.notifyNodeFailure(request.getNodeID());
-                revertFailedFailbackPlanEffects();
-                break;
-            }
-        }
-    }
-
-    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /**
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back 
node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE
-                    .getCCApplicationContext().getMessageBroker();
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
-        /**
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(reponse.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
-        AsterixReplicationProperties replicationProperties = 
AsterixAppContextInfo.INSTANCE
-                .getReplicationProperties();
-        Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
-        String nodeIdAddress = "";
-        //in case the node joined with a new IP address, we need to send it to 
the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
-        }
-
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE
-                .getCCApplicationContext().getMessageBroker();
-        for (String replica : remoteReplicas) {
-            //if the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
-                }
-            }
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back 
to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            plan.notifyNodeFailure(nodeId);
-        }
-    }
-
-    public synchronized boolean isMetadataNodeActive() {
-        return metadataNodeActive;
-    }
-
-    public boolean isReplicationEnabled() {
-        if (cluster != null && cluster.getDataReplication() != null) {
-            return cluster.getDataReplication().isEnabled();
-        }
-        return false;
-    }
-
-    public boolean isAutoFailoverEnabled() {
-        return isReplicationEnabled() && 
cluster.getDataReplication().isAutoFailover();
-    }
-
-    public synchronized JSONObject getClusterStateDescription() throws 
JSONException {
-        JSONObject stateDescription = new JSONObject();
-        stateDescription.put("state", state.name());
-        stateDescription.put("metadata_node", currentMetadataNode);
-        for (Map.Entry<String, ClusterPartition[]> entry : 
node2PartitionsMap.entrySet()) {
-            JSONObject nodeJSON = new JSONObject();
-            nodeJSON.put("node_id", entry.getKey());
-            List<String> partitions = new ArrayList<>();
-            for (ClusterPartition part : entry.getValue()) {
-                partitions.add("partition_" + part.getPartitionId());
-            }
-            nodeJSON.put("partitions", partitions);
-            stateDescription.accumulate("ncs", nodeJSON);
-        }
-        return stateDescription;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d5412dc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
new file mode 100644
index 0000000..487b8fa
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * A holder class for properties related to the Asterix cluster.
+ */
+
+public class ClusterStateManager {
+    /*
+     * TODO: currently after instance restarts we require all nodes to join 
again,
+     * otherwise the cluster wont be ACTIVE. we may overcome this by storing 
the cluster state before the instance
+     * shutdown and using it on startup to identify the nodes that are 
expected the join.
+     */
+
+    private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
+    public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
+    private static final String IO_DEVICES = "iodevices";
+    private Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
+
+    private final Cluster cluster;
+    private ClusterState state = ClusterState.UNUSABLE;
+
+    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
+
+    private boolean globalRecoveryCompleted = false;
+
+    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = null;
+
+    private long clusterRequestId = 0;
+    private String currentMetadataNode = null;
+    private boolean metadataNodeActive = false;
+    private boolean autoFailover = false;
+    private boolean replicationEnabled = false;
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+
+    private ClusterStateManager() {
+        cluster = ClusterProperties.INSTANCE.getCluster();
+        // if this is the CC process
+        if (AsterixAppContextInfo.INSTANCE.initialized()
+                && AsterixAppContextInfo.INSTANCE.getCCApplicationContext() != 
null) {
+            node2PartitionsMap = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
+            clusterPartitions = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
+            currentMetadataNode = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+            replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
+            autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
+            if (autoFailover) {
+                pendingTakeoverRequests = new HashMap<>();
+                pendingProcessingFailbackPlans = new LinkedList<>();
+                planId2FailbackPlanMap = new HashMap<>();
+            }
+        }
+    }
+
+    public synchronized void removeNCConfiguration(String nodeId) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
+        }
+        activeNcConfiguration.remove(nodeId);
+
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                notifyFailbackPlansNodeFailure(nodeId);
+                revertFailedFailbackPlanEffects();
+            }
+        } else {
+            //an active node failed
+            failedNodes.add(nodeId);
+            if (nodeId.equals(currentMetadataNode)) {
+                metadataNodeActive = false;
+                LOGGER.info("Metadata node is now inactive");
+            }
+            updateNodePartitions(nodeId, false);
+            if (replicationEnabled) {
+                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+                if (autoFailover) {
+                    notifyFailbackPlansNodeFailure(nodeId);
+                    requestPartitionsTakeover(nodeId);
+                }
+            }
+        }
+    }
+
+    public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
+        }
+        activeNcConfiguration.put(nodeId, configuration);
+
+        //a node trying to come back after failure
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                prepareFailbackPlan(nodeId);
+                return;
+            } else {
+                //a node completed local or remote recovery and rejoined
+                failedNodes.remove(nodeId);
+                if (replicationEnabled) {
+                    //notify other replica to reconnect to this node
+                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+                }
+            }
+        }
+
+        if (nodeId.equals(currentMetadataNode)) {
+            metadataNodeActive = true;
+            LOGGER.info("Metadata node is now active");
+        }
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void updateNodePartitions(String nodeId, boolean 
added) {
+        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+        // if this isn't a storage node, it will not have cluster partitions
+        if (nodePartitions != null) {
+            for (ClusterPartition p : nodePartitions) {
+                // set the active node for this node's partitions
+                p.setActive(added);
+                if (added) {
+                    p.setActiveNodeId(nodeId);
+                }
+            }
+            resetClusterPartitionConstraint();
+            updateClusterState();
+        }
+    }
+
+    private synchronized void updateClusterState() {
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (!p.isActive()) {
+                state = ClusterState.UNUSABLE;
+                LOGGER.info("Cluster is in UNUSABLE state");
+                return;
+            }
+        }
+        //if all storage partitions are active as well as the metadata node, 
then the cluster is active
+        if (metadataNodeActive) {
+            state = ClusterState.ACTIVE;
+            LOGGER.info("Cluster is now ACTIVE");
+            //start global recovery
+            
AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
+            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
+                processPendingFailbackPlans();
+            }
+        } else {
+            requestMetadataNodeTakeover();
+        }
+    }
+
+    /**
+     * Returns the IO devices configured for a Node Controller
+     *
+     * @param nodeId
+     *            unique identifier of the Node Controller
+     * @return a list of IO devices.
+     */
+    public synchronized String[] getIODevices(String nodeId) {
+        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
+        if (ncConfig == null) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Configuration parameters for nodeId " + nodeId
+                        + " not found. The node has not joined yet or has 
left.");
+            }
+            return new String[0];
+        }
+        return ncConfig.get(IO_DEVICES).split(",");
+    }
+
+    public ClusterState getState() {
+        return state;
+    }
+
+    public synchronized Node getAvailableSubstitutionNode() {
+        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : 
cluster.getSubstituteNodes().getNode();
+        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
+    }
+
+    public synchronized Set<String> getParticipantNodes() {
+        Set<String> participantNodes = new HashSet<>();
+        for (String pNode : activeNcConfiguration.keySet()) {
+            participantNodes.add(pNode);
+        }
+        return participantNodes;
+    }
+
+    public synchronized AlgebricksAbsolutePartitionConstraint 
getClusterLocations() {
+        if (clusterPartitionConstraint == null) {
+            resetClusterPartitionConstraint();
+        }
+        return clusterPartitionConstraint;
+    }
+
+    private synchronized void resetClusterPartitionConstraint() {
+        ArrayList<String> clusterActiveLocations = new ArrayList<>();
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (p.isActive()) {
+                clusterActiveLocations.add(p.getActiveNodeId());
+            }
+        }
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
+    }
+
+    public boolean isGlobalRecoveryCompleted() {
+        return globalRecoveryCompleted;
+    }
+
+    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
+        this.globalRecoveryCompleted = globalRecoveryCompleted;
+    }
+
+    public boolean isClusterActive() {
+        if (cluster == null) {
+            // this is a virtual cluster
+            return true;
+        }
+        return state == ClusterState.ACTIVE;
+    }
+
+    public static int getNumberOfNodes() {
+        return 
AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
+    }
+
+    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+        return node2PartitionsMap.get(nodeId);
+    }
+
+    public synchronized int getNodePartitionsCount(String node) {
+        if (node2PartitionsMap.containsKey(node)) {
+            return node2PartitionsMap.get(node).length;
+        }
+        return 0;
+    }
+
+    public synchronized ClusterPartition[] getClusterPartitons() {
+        ArrayList<ClusterPartition> partitons = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            partitons.add(partition);
+        }
+        return partitons.toArray(new ClusterPartition[] {});
+    }
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        AsterixReplicationProperties replicationProperties = 
AsterixAppContextInfo.INSTANCE.getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
+        if (!lostPartitions.isEmpty()) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    addActiveReplica(replica, partition, 
partitionRecoveryPlan);
+                    // bug? will always break on first loop execution
+                    break;
+                }
+            }
+
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+                    .getMessageBroker();
+            //For each replica, send a request to takeover the assigned 
partitions
+            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we 
tried to send the request to
+                     * has failed. When the failure notification arrives, we 
will send any pending request
+                     * that belongs to the failed NC to a different active 
replica.
+                     */
+                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
+                }
+            }
+        }
+    }
+
+    private void addActiveReplica(String replica, ClusterPartition partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+        }
+    }
+
+    private synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request that this node was 
supposed to handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitions.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getRequestId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+        return nodePartitions;
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties()
+                .getMetadataPartition();
+        //request the metadataPartition node to register itself as the 
metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
+        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+                .getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to 
send the request to
+             * has failed. When the failure notification arrives, a new NC 
will be assigned to
+             * the metadata partition and a new metadata node takeover request 
will be sent to it.
+             */
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
+        }
+    }
+
+    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
+        for (Integer partitonId : reponse.getPartitions()) {
+            ClusterPartition partition = clusterPartitions.get(partitonId);
+            partition.setActive(true);
+            partition.setActiveNodeId(reponse.getNodeId());
+        }
+        pendingTakeoverRequests.remove(reponse.getRequestId());
+        resetClusterPartitionConstraint();
+        updateClusterState();
+    }
+
+    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
reponse) {
+        currentMetadataNode = reponse.getNodeId();
+        metadataNodeActive = true;
+        LOGGER.info("Current metadata node: " + currentMetadataNode);
+        updateClusterState();
+    }
+
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        AsterixReplicationProperties replicationProperties = 
AsterixAppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicationClients(failingBackNodeId);
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = 
node2PartitionsMap.get(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed 
back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
+    }
+
+    private synchronized void processPendingFailbackPlans() {
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
+                        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionId);
+                        clusterPartition.setActive(false);
+                        //partition expected to be returned to the failing 
back node
+                        clusterPartition.setActiveNodeId(failbackNode);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback 
completes
+                     */
+                    String originalMetadataNode = 
AsterixAppContextInfo.INSTANCE.getMetadataProperties()
+                            .getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                    }
+
+                    //force new jobs to wait
+                    state = ClusterState.REBALANCING;
+                    ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE
+                            .getCCApplicationContext().getMessageBroker();
+                    handleFailbackRequests(plan, messageBroker);
+                    /**
+                     * wait until the current plan is completed before 
processing the next plan.
+                     * when the current one completes or is reverted, the 
cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
+                     */
+                    break;
+                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing 
to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
+    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back 
node
+            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+                    .getMessageBroker();
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(reponse.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
+        AsterixReplicationProperties replicationProperties = 
AsterixAppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
+        String nodeIdAddress = "";
+        //in case the node joined with a new IP address, we need to send it to 
the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
+        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+                .getMessageBroker();
+        for (String replica : remoteReplicas) {
+            //if the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
+                }
+            }
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back 
to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    public synchronized boolean isMetadataNodeActive() {
+        return metadataNodeActive;
+    }
+
+    public synchronized JSONObject getClusterStateDescription() throws 
JSONException {
+        JSONObject stateDescription = new JSONObject();
+        stateDescription.put("state", state.name());
+        stateDescription.put("metadata_node", currentMetadataNode);
+        for (Map.Entry<String, ClusterPartition[]> entry : 
node2PartitionsMap.entrySet()) {
+            JSONObject nodeJSON = new JSONObject();
+            nodeJSON.put("node_id", entry.getKey());
+            List<String> partitions = new ArrayList<>();
+            for (ClusterPartition part : entry.getValue()) {
+                partitions.add("partition_" + part.getPartitionId());
+            }
+            nodeJSON.put("partitions", partitions);
+            stateDescription.accumulate("ncs", nodeJSON);
+        }
+        return stateDescription;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d5412dc/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index eb22c22..6eb758c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -51,6 +51,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -108,9 +109,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
         checkpointHistory = 
txnSubsystem.getTransactionProperties().getCheckpointHistory();
-        IAsterixPropertiesProvider propertiesProvider =
-                (IAsterixPropertiesProvider) 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
-        replicationEnabled = 
propertiesProvider.getReplicationProperties().isReplicationEnabled();
+        replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
         cachedEntityCommitsPerJobSize = 
txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1d5412dc/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 6b25542..5f06b8a 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -24,6 +24,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import 
org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -72,7 +73,7 @@ public class TransactionSubsystem implements 
ITransactionSubsystem {
                     .getAppContext()).getReplicationProperties();
         }
 
-        if (asterixReplicationProperties != null && 
asterixReplicationProperties.isReplicationEnabled()) {
+        if (asterixReplicationProperties != null && 
ClusterProperties.INSTANCE.isReplicationEnabled()) {
             this.logManager = new LogManagerWithReplication(this);
         } else {
             this.logManager = new LogManager(this);

Reply via email to