Repository: asterixdb
Updated Branches:
  refs/heads/master 9b9dc22a2 -> 424413743


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 26cec1e..0b6608c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
@@ -34,7 +34,6 @@ import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -63,12 +62,12 @@ public class FeedDataSource extends DataSource implements 
IMutationDataSource {
     private final List<ScalarFunctionCallExpression> keyAccessExpression;
     private final FeedConnection feedConnection;
 
-    public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, 
IAType itemType, IAType metaType,
-            List<IAType> pkTypes, List<List<String>> partitioningKeys,
-            List<ScalarFunctionCallExpression> keyAccessExpression, EntityId 
sourceFeedId,
-            FeedRuntimeType location, String[] locations, INodeDomain domain, 
FeedConnection feedConnection)
-            throws AlgebricksException {
+    public FeedDataSource(MetadataProvider metadataProvider, Feed feed, 
DataSourceId id, String targetDataset,
+            IAType itemType, IAType metaType, List<IAType> pkTypes,
+            List<ScalarFunctionCallExpression> keyAccessExpression, EntityId 
sourceFeedId, FeedRuntimeType location,
+            String[] locations, INodeDomain domain, FeedConnection 
feedConnection) throws AlgebricksException {
         super(id, itemType, metaType, Type.FEED, domain);
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
         this.feed = feed;
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
@@ -76,7 +75,7 @@ public class FeedDataSource extends DataSource implements 
IMutationDataSource {
         this.locations = locations;
         this.pkTypes = pkTypes;
         this.keyAccessExpression = keyAccessExpression;
-        this.computeCardinality = 
ClusterStateManager.INSTANCE.getParticipantNodes().size();
+        this.computeCardinality = 
appCtx.getClusterStateManager().getParticipantNodes().size();
         this.feedConnection = feedConnection;
         initFeedDataSource();
     }
@@ -170,8 +169,8 @@ public class FeedDataSource extends DataSource implements 
IMutationDataSource {
             throws AlgebricksException {
         try {
             ARecordType feedOutputType = (ARecordType) itemType;
-            ISerializerDeserializer payloadSerde = 
NonTaggedDataFormat.INSTANCE.getSerdeProvider()
-                    .getSerializerDeserializer(feedOutputType);
+            ISerializerDeserializer payloadSerde =
+                    
NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(feedOutputType);
             ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
             serdes.add(payloadSerde);
             if (metaItemType != null) {
@@ -182,16 +181,16 @@ public class FeedDataSource extends DataSource implements 
IMutationDataSource {
                     
serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
                 }
             }
-            RecordDescriptor feedDesc = new RecordDescriptor(
-                    serdes.toArray(new 
ISerializerDeserializer[serdes.size()]));
-            FeedPolicyEntity feedPolicy = (FeedPolicyEntity) getProperties()
-                    .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+            RecordDescriptor feedDesc =
+                    new RecordDescriptor(serdes.toArray(new 
ISerializerDeserializer[serdes.size()]));
+            FeedPolicyEntity feedPolicy =
+                    (FeedPolicyEntity) 
getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
             if (feedPolicy == null) {
                 throw new AlgebricksException("Feed not configured with a 
policy");
             }
             
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy.getPolicyName());
-            FeedConnectionId feedConnectionId = new 
FeedConnectionId(getId().getDataverseName(),
-                    getId().getDatasourceName(), getTargetDataset());
+            FeedConnectionId feedConnectionId =
+                    new FeedConnectionId(getId().getDataverseName(), 
getId().getDatasourceName(), getTargetDataset());
             FeedCollectOperatorDescriptor feedCollector = new 
FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
                     feedOutputType, feedDesc, feedPolicy.getProperties(), 
getLocation());
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index c23755d..97c6ed2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -21,6 +21,7 @@ package org.apache.asterix.metadata.declared;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -36,7 +37,6 @@ import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
@@ -111,12 +111,12 @@ public class MetadataManagerUtil {
         return dataset;
     }
 
-    public static INodeDomain findNodeDomain(MetadataTransactionContext 
mdTxnCtx, String nodeGroupName)
-            throws AlgebricksException {
+    public static INodeDomain findNodeDomain(IClusterStateManager 
clusterStateManager,
+            MetadataTransactionContext mdTxnCtx, String nodeGroupName) throws 
AlgebricksException {
         NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
nodeGroupName);
         List<String> partitions = new ArrayList<>();
         for (String location : nodeGroup.getNodeNames()) {
-            int numPartitions = 
ClusterStateManager.INSTANCE.getNodePartitionsCount(location);
+            int numPartitions = 
clusterStateManager.getNodePartitionsCount(location);
             for (int i = 0; i < numPartitions; i++) {
                 partitions.add(location);
             }
@@ -165,24 +165,24 @@ public class MetadataManagerUtil {
         }
     }
 
-    public static DataSource findDataSource(MetadataTransactionContext 
mdTxnCtx, DataSourceId id)
-            throws AlgebricksException {
+    public static DataSource findDataSource(IClusterStateManager 
clusterStateManager,
+            MetadataTransactionContext mdTxnCtx, DataSourceId id) throws 
AlgebricksException {
         try {
-            return lookupSourceInMetadata(mdTxnCtx, id);
+            return lookupSourceInMetadata(clusterStateManager, mdTxnCtx, id);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
     }
 
-    public static DataSource lookupSourceInMetadata(MetadataTransactionContext 
mdTxnCtx, DataSourceId aqlId)
-            throws AlgebricksException {
+    public static DataSource lookupSourceInMetadata(IClusterStateManager 
clusterStateManager,
+            MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) throws 
AlgebricksException {
         Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), 
aqlId.getDatasourceName());
         if (dataset == null) {
             throw new AlgebricksException("Datasource with id " + aqlId + " 
was not found.");
         }
         IAType itemType = findType(mdTxnCtx, 
dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         IAType metaItemType = findType(mdTxnCtx, 
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
-        INodeDomain domain = findNodeDomain(mdTxnCtx, 
dataset.getNodeGroupName());
+        INodeDomain domain = findNodeDomain(clusterStateManager, mdTxnCtx, 
dataset.getNodeGroupName());
         byte datasourceType = 
dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? 
DataSource.Type.EXTERNAL_DATASET
                 : DataSource.Type.INTERNAL_DATASET;
         return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, 
datasourceType,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8971a90..d6a3f21 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -84,7 +85,6 @@ import 
org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -295,7 +295,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public INodeDomain findNodeDomain(String nodeGroupName) throws 
AlgebricksException {
-        return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+        return 
MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx, 
nodeGroupName);
     }
 
     public List<String> findNodes(String nodeGroupName) throws 
AlgebricksException {
@@ -329,11 +329,11 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
     @Override
     public DataSource findDataSource(DataSourceId id) throws 
AlgebricksException {
-        return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+        return 
MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx, 
id);
     }
 
     public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws 
AlgebricksException {
-        return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+        return 
MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(), 
mdTxnCtx, aqlId);
     }
 
     @Override
@@ -709,8 +709,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         int numPartitions = 0;
         List<String> nodeGroup =
                 MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName()).getNodeNames();
+        IClusterStateManager csm = appCtx.getClusterStateManager();
         for (String nd : nodeGroup) {
-            numPartitions += 
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
+            numPartitions += csm.getNodePartitionsCount(nd);
         }
         return numElementsHint / numPartitions;
     }
@@ -755,12 +756,13 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitAndConstraints(String dataverse) {
-        return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
+        return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
+                dataverse);
     }
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
             throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, 
mdTxnCtx);
+        return 
SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), 
dataset, indexName, mdTxnCtx);
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String adapterName)
@@ -777,7 +779,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
-        return ClusterStateManager.INSTANCE.getClusterLocations();
+        return appCtx.getClusterStateManager().getClusterLocations();
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDataLookupRuntime(

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 6825f10..5b7ea59 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -30,7 +31,6 @@ import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -42,11 +42,11 @@ public class SplitsAndConstraintsUtil {
     private SplitsAndConstraintsUtil() {
     }
 
-    private static FileSplit[] getDataverseSplits(String dataverseName) {
+    private static FileSplit[] getDataverseSplits(IClusterStateManager 
clusterStateManager, String dataverseName) {
         File relPathFile = new File(dataverseName);
         List<FileSplit> splits = new ArrayList<>();
         // get all partitions
-        ClusterPartition[] clusterPartition = 
ClusterStateManager.INSTANCE.getClusterPartitons();
+        ClusterPartition[] clusterPartition = 
clusterStateManager.getClusterPartitons();
         String storageDirName = 
ClusterProperties.INSTANCE.getStorageDirectoryName();
         for (int j = 0; j < clusterPartition.length; j++) {
             File f = new File(
@@ -57,28 +57,29 @@ public class SplitsAndConstraintsUtil {
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException {
+    public static FileSplit[] getIndexSplits(IClusterStateManager 
clusterStateManager, Dataset dataset,
+            String indexName, MetadataTransactionContext mdTxnCtx) throws 
AlgebricksException {
         try {
             NodeGroup nodeGroup = 
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName());
             if (nodeGroup == null) {
                 throw new AlgebricksException("Couldn't find node group " + 
dataset.getNodeGroupName());
             }
             List<String> nodeList = nodeGroup.getNodeNames();
-            return getIndexSplits(dataset, indexName, nodeList);
+            return getIndexSplits(clusterStateManager, dataset, indexName, 
nodeList);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, List<String> nodes) {
+    public static FileSplit[] getIndexSplits(IClusterStateManager 
clusterStateManager, Dataset dataset,
+            String indexName, List<String> nodes) {
         File relPathFile = new 
File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
                 dataset.getDatasetName(), indexName, 
dataset.getRebalanceCount()));
         String storageDirName = 
ClusterProperties.INSTANCE.getStorageDirectoryName();
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : nodes) {
-            int numPartitions = 
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
-            ClusterPartition[] nodePartitions = 
ClusterStateManager.INSTANCE.getNodePartitions(nd);
+            int numPartitions = clusterStateManager.getNodePartitionsCount(nd);
+            ClusterPartition[] nodePartitions = 
clusterStateManager.getNodePartitions(nd);
             // currently this case is never executed since the metadata group 
doesn't exists
             if 
(dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME)
 == 0) {
                 numPartitions = 1;
@@ -97,8 +98,8 @@ public class SplitsAndConstraintsUtil {
     }
 
     public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getDataverseSplitProviderAndConstraints(
-            String dataverse) {
-        FileSplit[] splits = getDataverseSplits(dataverse);
+            IClusterStateManager clusterStateManager, String dataverse) {
+        FileSplit[] splits = getDataverseSplits(clusterStateManager, 
dataverse);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 194fd59..82a1177 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,11 +20,11 @@ package org.apache.asterix.runtime.message;
 
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdRequestMessage implements ICcAddressedMessage {
@@ -40,7 +40,8 @@ public class ResourceIdRequestMessage implements 
ICcAddressedMessage {
         try {
             ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
             ResourceIdRequestResponseMessage reponse = new 
ResourceIdRequestResponseMessage();
-            if (!ClusterStateManager.INSTANCE.isClusterActive()) {
+            IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
+            if (!clusterStateManager.isClusterActive()) {
                 reponse.setResourceId(-1);
                 reponse.setException(new Exception("Cannot generate global 
resource id when cluster is not active."));
             } else {
@@ -49,7 +50,7 @@ public class ResourceIdRequestMessage implements 
ICcAddressedMessage {
                 if (reponse.getResourceId() < 0) {
                     reponse.setException(new Exception("One or more nodes has 
not reported max resource id."));
                 }
-                requestMaxResourceID(resourceIdManager, broker);
+                requestMaxResourceID(clusterStateManager, resourceIdManager, 
broker);
             }
             broker.sendApplicationMessageToNC(reponse, src);
         } catch (Exception e) {
@@ -57,8 +58,9 @@ public class ResourceIdRequestMessage implements 
ICcAddressedMessage {
         }
     }
 
-    private void requestMaxResourceID(IResourceIdManager resourceIdManager, 
ICCMessageBroker broker) throws Exception {
-        Set<String> getParticipantNodes = 
ClusterStateManager.INSTANCE.getParticipantNodes();
+    private void requestMaxResourceID(IClusterStateManager 
clusterStateManager, IResourceIdManager resourceIdManager,
+            ICCMessageBroker broker) throws Exception {
+        Set<String> getParticipantNodes = 
clusterStateManager.getParticipantNodes();
         ReportMaxResourceIdRequestMessage msg = new 
ReportMaxResourceIdRequestMessage();
         for (String nodeId : getParticipantNodes) {
             if (!resourceIdManager.reported(nodeId)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
new file mode 100644
index 0000000..6a5ed08
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.transaction;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.transactions.IResourceIdManager;
+
+public class ResourceIdManager implements IResourceIdManager {
+
+    private final IClusterStateManager csm;
+    private final AtomicLong globalResourceId = new AtomicLong();
+    private volatile Set<String> reportedNodes = new HashSet<>();
+    private volatile boolean allReported = false;
+
+    public ResourceIdManager(IClusterStateManager csm) {
+        this.csm = csm;
+    }
+
+    @Override
+    public long createResourceId() {
+        if (!allReported) {
+            synchronized (this) {
+                if (!allReported) {
+                    if (reportedNodes.size() < csm.getNumberOfNodes()) {
+                        return -1;
+                    } else {
+                        reportedNodes = null;
+                        allReported = true;
+                    }
+                }
+            }
+        }
+        return globalResourceId.incrementAndGet();
+    }
+
+    @Override
+    public synchronized boolean reported(String nodeId) {
+        return allReported || reportedNodes.contains(nodeId);
+    }
+
+    @Override
+    public synchronized void report(String nodeId, long maxResourceId) {
+        if (!allReported) {
+            globalResourceId.set(Math.max(maxResourceId, 
globalResourceId.get()));
+            reportedNodes.add(nodeId);
+            if (reportedNodes.size() == csm.getNumberOfNodes()) {
+                reportedNodes = null;
+                allReported = true;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 28c480f..e4cc7f4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -43,6 +43,7 @@ import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -77,17 +78,16 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     private IFaultToleranceStrategy ftStrategy;
     private IJobLifecycleListener activeLifeCycleListener;
     private IMetadataLockManager mdLockManager;
+    private IClusterStateManager clusterStateManager;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
IHyracksClientConnection hcc,
-            ILibraryManager libraryManager, IResourceIdManager 
resourceIdManager,
-            Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
-            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener 
activeLifeCycleListener,
-            IStorageComponentProvider storageComponentProvider, 
IMetadataLockManager mdLockManager)
-            throws AsterixException, IOException {
+            ILibraryManager libraryManager, Supplier<IMetadataBootstrap> 
metadataBootstrapSupplier,
+            IGlobalRecoveryManager globalRecoveryManager, 
IFaultToleranceStrategy ftStrategy,
+            IJobLifecycleListener activeLifeCycleListener, 
IStorageComponentProvider storageComponentProvider,
+            IMetadataLockManager mdLockManager) throws AsterixException, 
IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
-        this.resourceIdManager = resourceIdManager;
         this.activeLifeCycleListener = activeLifeCycleListener;
         // Determine whether to use old-style asterix-configuration.xml or 
new-style configuration.
         // QQQ strip this out eventually
@@ -109,6 +109,9 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         this.globalRecoveryManager = globalRecoveryManager;
         this.storageComponentProvider = storageComponentProvider;
         this.mdLockManager = mdLockManager;
+        clusterStateManager = new ClusterStateManager();
+        clusterStateManager.setCcAppCtx(this);
+        this.resourceIdManager = new ResourceIdManager(clusterStateManager);
     }
 
     @Override
@@ -204,6 +207,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         return resourceIdManager;
     }
 
+    @Override
     public IMetadataBootstrap getMetadataBootstrap() {
         return metadataBootstrapSupplier.get();
     }
@@ -230,6 +234,6 @@ public class CcApplicationContext implements 
ICcApplicationContext {
 
     @Override
     public IClusterStateManager getClusterStateManager() {
-        return ClusterStateManager.INSTANCE;
+        return clusterStateManager;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index cdb3112..36cb10d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -36,6 +36,7 @@ import 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -66,7 +67,6 @@ public class ClusterStateManager implements 
IClusterStateManager {
      */
 
     private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
-    public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
     private final Map<String, Map<IOption, Object>> activeNcConfiguration = 
new HashMap<>();
     private Set<String> pendingRemoval = new HashSet<>();
     private final Cluster cluster;
@@ -78,13 +78,14 @@ public class ClusterStateManager implements 
IClusterStateManager {
     private boolean metadataNodeActive = false;
     private Set<String> failedNodes = new HashSet<>();
     private IFaultToleranceStrategy ftStrategy;
-    private CcApplicationContext appCtx;
+    private ICcApplicationContext appCtx;
 
-    private ClusterStateManager() {
+    public ClusterStateManager() {
         cluster = ClusterProperties.INSTANCE.getCluster();
     }
 
-    public void setCcAppCtx(CcApplicationContext appCtx) {
+    @Override
+    public void setCcAppCtx(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
         node2PartitionsMap = 
appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = 
appCtx.getMetadataProperties().getClusterPartitions();
@@ -93,6 +94,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         ftStrategy.bindTo(this);
     }
 
+    @Override
     public synchronized void removeNCConfiguration(String nodeId) throws 
HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
@@ -102,6 +104,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         pendingRemoval.remove(nodeId);
     }
 
+    @Override
     public synchronized void addNCConfiguration(String nodeId, Map<IOption, 
Object> configuration)
             throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -209,13 +212,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return true;
     }
 
-    /**
-     * Returns the IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return a list of IO devices.
-     */
+    @Override
     public synchronized String[] getIODevices(String nodeId) {
         Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
         if (ncConfig == null) {
@@ -233,11 +230,13 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return state;
     }
 
+    @Override
     public synchronized Node getAvailableSubstitutionNode() {
         List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : 
cluster.getSubstituteNodes().getNode();
         return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
     }
 
+    @Override
     public synchronized Set<String> getParticipantNodes() {
         Set<String> participantNodes = new HashSet<>();
         for (String pNode : activeNcConfiguration.keySet()) {
@@ -246,6 +245,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return participantNodes;
     }
 
+    @Override
     public synchronized Set<String> getParticipantNodes(boolean 
excludePendingRemoval) {
         Set<String> participantNodes = getParticipantNodes();
         if (excludePendingRemoval) {
@@ -254,6 +254,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return participantNodes;
     }
 
+    @Override
     public synchronized AlgebricksAbsolutePartitionConstraint 
getClusterLocations() {
         if (clusterPartitionConstraint == null) {
             resetClusterPartitionConstraint();
@@ -272,6 +273,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
                 new 
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new 
String[] {}));
     }
 
+    @Override
     public synchronized boolean isClusterActive() {
         if (cluster == null) {
             // this is a virtual cluster
@@ -280,6 +282,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return state == ClusterState.ACTIVE;
     }
 
+    @Override
     public int getNumberOfNodes() {
         return appCtx.getMetadataProperties().getNodeNames().size();
     }
@@ -289,6 +292,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return node2PartitionsMap.get(nodeId);
     }
 
+    @Override
     public synchronized int getNodePartitionsCount(String node) {
         if (node2PartitionsMap.containsKey(node)) {
             return node2PartitionsMap.get(node).length;
@@ -305,10 +309,12 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return partitons.toArray(new ClusterPartition[] {});
     }
 
+    @Override
     public synchronized boolean isMetadataNodeActive() {
         return metadataNodeActive;
     }
 
+    @Override
     public synchronized ObjectNode getClusterStateDescription() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode stateDescription = om.createObjectNode();
@@ -342,6 +348,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return stateDescription;
     }
 
+    @Override
     public synchronized ObjectNode getClusterStateSummary() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode stateDescription = om.createObjectNode();
@@ -395,6 +402,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         }
     }
 
+    @Override
     public synchronized void removePending(String nodeId) {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
@@ -406,6 +414,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         }
     }
 
+    @Override
     public synchronized boolean cancelRemovePending(String nodeId) {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Deregistering intention to remove node id " + nodeId);
@@ -423,8 +432,8 @@ public class ClusterStateManager implements 
IClusterStateManager {
     }
 
     private void updateNodeConfig(String nodeId, Map<IOption, Object> 
configuration) {
-        ConfigManager configManager = ((ConfigManagerApplicationConfig) 
appCtx.getServiceContext().getAppConfig())
-                .getConfigManager();
+        ConfigManager configManager =
+                ((ConfigManagerApplicationConfig) 
appCtx.getServiceContext().getAppConfig()).getConfigManager();
         for (Map.Entry<IOption, Object> entry : configuration.entrySet()) {
             if (entry.getKey().section() == Section.NC) {
                 configManager.set(nodeId, entry.getKey(), entry.getValue());

Reply via email to