Repository: asterixdb Updated Branches: refs/heads/master 9b9dc22a2 -> 424413743
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java index 26cec1e..0b6608c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.api.IFeed; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; @@ -34,7 +34,6 @@ import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -63,12 +62,12 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { private final List<ScalarFunctionCallExpression> keyAccessExpression; private final FeedConnection feedConnection; - public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType, - List<IAType> pkTypes, List<List<String>> partitioningKeys, - List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, - FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection) - throws AlgebricksException { + public FeedDataSource(MetadataProvider metadataProvider, Feed feed, DataSourceId id, String targetDataset, + IAType itemType, IAType metaType, List<IAType> pkTypes, + List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, FeedRuntimeType location, + String[] locations, INodeDomain domain, FeedConnection feedConnection) throws AlgebricksException { super(id, itemType, metaType, Type.FEED, domain); + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); this.feed = feed; this.targetDataset = targetDataset; this.sourceFeedId = sourceFeedId; @@ -76,7 +75,7 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { this.locations = locations; this.pkTypes = pkTypes; this.keyAccessExpression = keyAccessExpression; - this.computeCardinality = ClusterStateManager.INSTANCE.getParticipantNodes().size(); + this.computeCardinality = appCtx.getClusterStateManager().getParticipantNodes().size(); this.feedConnection = feedConnection; initFeedDataSource(); } @@ -170,8 +169,8 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { throws AlgebricksException { try { ARecordType feedOutputType = (ARecordType) itemType; - ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider() - .getSerializerDeserializer(feedOutputType); + ISerializerDeserializer payloadSerde = + NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(feedOutputType); ArrayList<ISerializerDeserializer> serdes = new ArrayList<>(); serdes.add(payloadSerde); if (metaItemType != null) { @@ -182,16 +181,16 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type)); } } - RecordDescriptor feedDesc = new RecordDescriptor( - serdes.toArray(new ISerializerDeserializer[serdes.size()])); - FeedPolicyEntity feedPolicy = (FeedPolicyEntity) getProperties() - .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY); + RecordDescriptor feedDesc = + new RecordDescriptor(serdes.toArray(new ISerializerDeserializer[serdes.size()])); + FeedPolicyEntity feedPolicy = + (FeedPolicyEntity) getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY); if (feedPolicy == null) { throw new AlgebricksException("Feed not configured with a policy"); } feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName()); - FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDataverseName(), - getId().getDatasourceName(), getTargetDataset()); + FeedConnectionId feedConnectionId = + new FeedConnectionId(getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset()); FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId, feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java index c23755d..97c6ed2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java @@ -21,6 +21,7 @@ package org.apache.asterix.metadata.declared; import java.util.ArrayList; import java.util.List; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.MetadataManager; @@ -36,7 +37,6 @@ import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; @@ -111,12 +111,12 @@ public class MetadataManagerUtil { return dataset; } - public static INodeDomain findNodeDomain(MetadataTransactionContext mdTxnCtx, String nodeGroupName) - throws AlgebricksException { + public static INodeDomain findNodeDomain(IClusterStateManager clusterStateManager, + MetadataTransactionContext mdTxnCtx, String nodeGroupName) throws AlgebricksException { NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName); List<String> partitions = new ArrayList<>(); for (String location : nodeGroup.getNodeNames()) { - int numPartitions = ClusterStateManager.INSTANCE.getNodePartitionsCount(location); + int numPartitions = clusterStateManager.getNodePartitionsCount(location); for (int i = 0; i < numPartitions; i++) { partitions.add(location); } @@ -165,24 +165,24 @@ public class MetadataManagerUtil { } } - public static DataSource findDataSource(MetadataTransactionContext mdTxnCtx, DataSourceId id) - throws AlgebricksException { + public static DataSource findDataSource(IClusterStateManager clusterStateManager, + MetadataTransactionContext mdTxnCtx, DataSourceId id) throws AlgebricksException { try { - return lookupSourceInMetadata(mdTxnCtx, id); + return lookupSourceInMetadata(clusterStateManager, mdTxnCtx, id); } catch (MetadataException e) { throw new AlgebricksException(e); } } - public static DataSource lookupSourceInMetadata(MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) - throws AlgebricksException { + public static DataSource lookupSourceInMetadata(IClusterStateManager clusterStateManager, + MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) throws AlgebricksException { Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), aqlId.getDatasourceName()); if (dataset == null) { throw new AlgebricksException("Datasource with id " + aqlId + " was not found."); } IAType itemType = findType(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); IAType metaItemType = findType(mdTxnCtx, dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); - INodeDomain domain = findNodeDomain(mdTxnCtx, dataset.getNodeGroupName()); + INodeDomain domain = findNodeDomain(clusterStateManager, mdTxnCtx, dataset.getNodeGroupName()); byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? DataSource.Type.EXTERNAL_DATASET : DataSource.Type.INTERNAL_DATASET; return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 8971a90..d6a3f21 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -84,7 +85,6 @@ import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -295,7 +295,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException { - return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName); + return MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx, nodeGroupName); } public List<String> findNodes(String nodeGroupName) throws AlgebricksException { @@ -329,11 +329,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> @Override public DataSource findDataSource(DataSourceId id) throws AlgebricksException { - return MetadataManagerUtil.findDataSource(mdTxnCtx, id); + return MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx, id); } public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws AlgebricksException { - return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId); + return MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(), mdTxnCtx, aqlId); } @Override @@ -709,8 +709,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> int numPartitions = 0; List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); + IClusterStateManager csm = appCtx.getClusterStateManager(); for (String nd : nodeGroup) { - numPartitions += ClusterStateManager.INSTANCE.getNodePartitionsCount(nd); + numPartitions += csm.getNodePartitionsCount(nd); } return numElementsHint / numPartitions; } @@ -755,12 +756,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) { - return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse); + return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), + dataverse); } public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) throws AlgebricksException { - return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx); + return SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset, indexName, mdTxnCtx); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) @@ -777,7 +779,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public AlgebricksAbsolutePartitionConstraint getClusterLocations() { - return ClusterStateManager.INSTANCE.getClusterLocations(); + return appCtx.getClusterStateManager().getClusterLocations(); } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java index 6825f10..5b7ea59 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.utils.StoragePathUtil; @@ -30,7 +31,6 @@ import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.NodeGroup; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -42,11 +42,11 @@ public class SplitsAndConstraintsUtil { private SplitsAndConstraintsUtil() { } - private static FileSplit[] getDataverseSplits(String dataverseName) { + private static FileSplit[] getDataverseSplits(IClusterStateManager clusterStateManager, String dataverseName) { File relPathFile = new File(dataverseName); List<FileSplit> splits = new ArrayList<>(); // get all partitions - ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons(); + ClusterPartition[] clusterPartition = clusterStateManager.getClusterPartitons(); String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); for (int j = 0; j < clusterPartition.length; j++) { File f = new File( @@ -57,28 +57,29 @@ public class SplitsAndConstraintsUtil { return splits.toArray(new FileSplit[] {}); } - public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, MetadataTransactionContext mdTxnCtx) - throws AlgebricksException { + public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset, + String indexName, MetadataTransactionContext mdTxnCtx) throws AlgebricksException { try { NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()); if (nodeGroup == null) { throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName()); } List<String> nodeList = nodeGroup.getNodeNames(); - return getIndexSplits(dataset, indexName, nodeList); + return getIndexSplits(clusterStateManager, dataset, indexName, nodeList); } catch (MetadataException me) { throw new AlgebricksException(me); } } - public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, List<String> nodes) { + public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset, + String indexName, List<String> nodes) { File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), dataset.getDatasetName(), indexName, dataset.getRebalanceCount())); String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); List<FileSplit> splits = new ArrayList<>(); for (String nd : nodes) { - int numPartitions = ClusterStateManager.INSTANCE.getNodePartitionsCount(nd); - ClusterPartition[] nodePartitions = ClusterStateManager.INSTANCE.getNodePartitions(nd); + int numPartitions = clusterStateManager.getNodePartitionsCount(nd); + ClusterPartition[] nodePartitions = clusterStateManager.getNodePartitions(nd); // currently this case is never executed since the metadata group doesn't exists if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) { numPartitions = 1; @@ -97,8 +98,8 @@ public class SplitsAndConstraintsUtil { } public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getDataverseSplitProviderAndConstraints( - String dataverse) { - FileSplit[] splits = getDataverseSplits(dataverse); + IClusterStateManager clusterStateManager, String dataverse) { + FileSplit[] splits = getDataverseSplits(clusterStateManager, dataverse); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index 194fd59..82a1177 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -20,11 +20,11 @@ package org.apache.asterix.runtime.message; import java.util.Set; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ResourceIdRequestMessage implements ICcAddressedMessage { @@ -40,7 +40,8 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { try { ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage(); - if (!ClusterStateManager.INSTANCE.isClusterActive()) { + IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); + if (!clusterStateManager.isClusterActive()) { reponse.setResourceId(-1); reponse.setException(new Exception("Cannot generate global resource id when cluster is not active.")); } else { @@ -49,7 +50,7 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { if (reponse.getResourceId() < 0) { reponse.setException(new Exception("One or more nodes has not reported max resource id.")); } - requestMaxResourceID(resourceIdManager, broker); + requestMaxResourceID(clusterStateManager, resourceIdManager, broker); } broker.sendApplicationMessageToNC(reponse, src); } catch (Exception e) { @@ -57,8 +58,9 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { } } - private void requestMaxResourceID(IResourceIdManager resourceIdManager, ICCMessageBroker broker) throws Exception { - Set<String> getParticipantNodes = ClusterStateManager.INSTANCE.getParticipantNodes(); + private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager, + ICCMessageBroker broker) throws Exception { + Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(); ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage(); for (String nodeId : getParticipantNodes) { if (!resourceIdManager.reported(nodeId)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java new file mode 100644 index 0000000..6a5ed08 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.transaction; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.transactions.IResourceIdManager; + +public class ResourceIdManager implements IResourceIdManager { + + private final IClusterStateManager csm; + private final AtomicLong globalResourceId = new AtomicLong(); + private volatile Set<String> reportedNodes = new HashSet<>(); + private volatile boolean allReported = false; + + public ResourceIdManager(IClusterStateManager csm) { + this.csm = csm; + } + + @Override + public long createResourceId() { + if (!allReported) { + synchronized (this) { + if (!allReported) { + if (reportedNodes.size() < csm.getNumberOfNodes()) { + return -1; + } else { + reportedNodes = null; + allReported = true; + } + } + } + } + return globalResourceId.incrementAndGet(); + } + + @Override + public synchronized boolean reported(String nodeId) { + return allReported || reportedNodes.contains(nodeId); + } + + @Override + public synchronized void report(String nodeId, long maxResourceId) { + if (!allReported) { + globalResourceId.set(Math.max(maxResourceId, globalResourceId.get())); + reportedNodes.add(nodeId); + if (reportedNodes.size() == csm.getNumberOfNodes()) { + reportedNodes = null; + allReported = true; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 28c480f..e4cc7f4 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -43,6 +43,7 @@ import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.IJobLifecycleListener; @@ -77,17 +78,16 @@ public class CcApplicationContext implements ICcApplicationContext { private IFaultToleranceStrategy ftStrategy; private IJobLifecycleListener activeLifeCycleListener; private IMetadataLockManager mdLockManager; + private IClusterStateManager clusterStateManager; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, - ILibraryManager libraryManager, IResourceIdManager resourceIdManager, - Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, - IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener, - IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager) - throws AsterixException, IOException { + ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, + IGlobalRecoveryManager globalRecoveryManager, IFaultToleranceStrategy ftStrategy, + IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, + IMetadataLockManager mdLockManager) throws AsterixException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; this.libraryManager = libraryManager; - this.resourceIdManager = resourceIdManager; this.activeLifeCycleListener = activeLifeCycleListener; // Determine whether to use old-style asterix-configuration.xml or new-style configuration. // QQQ strip this out eventually @@ -109,6 +109,9 @@ public class CcApplicationContext implements ICcApplicationContext { this.globalRecoveryManager = globalRecoveryManager; this.storageComponentProvider = storageComponentProvider; this.mdLockManager = mdLockManager; + clusterStateManager = new ClusterStateManager(); + clusterStateManager.setCcAppCtx(this); + this.resourceIdManager = new ResourceIdManager(clusterStateManager); } @Override @@ -204,6 +207,7 @@ public class CcApplicationContext implements ICcApplicationContext { return resourceIdManager; } + @Override public IMetadataBootstrap getMetadataBootstrap() { return metadataBootstrapSupplier.get(); } @@ -230,6 +234,6 @@ public class CcApplicationContext implements ICcApplicationContext { @Override public IClusterStateManager getClusterStateManager() { - return ClusterStateManager.INSTANCE; + return clusterStateManager; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/42441374/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index cdb3112..36cb10d 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.IFaultToleranceStrategy; @@ -66,7 +67,6 @@ public class ClusterStateManager implements IClusterStateManager { */ private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName()); - public static final ClusterStateManager INSTANCE = new ClusterStateManager(); private final Map<String, Map<IOption, Object>> activeNcConfiguration = new HashMap<>(); private Set<String> pendingRemoval = new HashSet<>(); private final Cluster cluster; @@ -78,13 +78,14 @@ public class ClusterStateManager implements IClusterStateManager { private boolean metadataNodeActive = false; private Set<String> failedNodes = new HashSet<>(); private IFaultToleranceStrategy ftStrategy; - private CcApplicationContext appCtx; + private ICcApplicationContext appCtx; - private ClusterStateManager() { + public ClusterStateManager() { cluster = ClusterProperties.INSTANCE.getCluster(); } - public void setCcAppCtx(CcApplicationContext appCtx) { + @Override + public void setCcAppCtx(ICcApplicationContext appCtx) { this.appCtx = appCtx; node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions(); clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions(); @@ -93,6 +94,7 @@ public class ClusterStateManager implements IClusterStateManager { ftStrategy.bindTo(this); } + @Override public synchronized void removeNCConfiguration(String nodeId) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Removing configuration parameters for node id " + nodeId); @@ -102,6 +104,7 @@ public class ClusterStateManager implements IClusterStateManager { pendingRemoval.remove(nodeId); } + @Override public synchronized void addNCConfiguration(String nodeId, Map<IOption, Object> configuration) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { @@ -209,13 +212,7 @@ public class ClusterStateManager implements IClusterStateManager { return true; } - /** - * Returns the IO devices configured for a Node Controller - * - * @param nodeId - * unique identifier of the Node Controller - * @return a list of IO devices. - */ + @Override public synchronized String[] getIODevices(String nodeId) { Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId); if (ncConfig == null) { @@ -233,11 +230,13 @@ public class ClusterStateManager implements IClusterStateManager { return state; } + @Override public synchronized Node getAvailableSubstitutionNode() { List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode(); return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0); } + @Override public synchronized Set<String> getParticipantNodes() { Set<String> participantNodes = new HashSet<>(); for (String pNode : activeNcConfiguration.keySet()) { @@ -246,6 +245,7 @@ public class ClusterStateManager implements IClusterStateManager { return participantNodes; } + @Override public synchronized Set<String> getParticipantNodes(boolean excludePendingRemoval) { Set<String> participantNodes = getParticipantNodes(); if (excludePendingRemoval) { @@ -254,6 +254,7 @@ public class ClusterStateManager implements IClusterStateManager { return participantNodes; } + @Override public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() { if (clusterPartitionConstraint == null) { resetClusterPartitionConstraint(); @@ -272,6 +273,7 @@ public class ClusterStateManager implements IClusterStateManager { new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); } + @Override public synchronized boolean isClusterActive() { if (cluster == null) { // this is a virtual cluster @@ -280,6 +282,7 @@ public class ClusterStateManager implements IClusterStateManager { return state == ClusterState.ACTIVE; } + @Override public int getNumberOfNodes() { return appCtx.getMetadataProperties().getNodeNames().size(); } @@ -289,6 +292,7 @@ public class ClusterStateManager implements IClusterStateManager { return node2PartitionsMap.get(nodeId); } + @Override public synchronized int getNodePartitionsCount(String node) { if (node2PartitionsMap.containsKey(node)) { return node2PartitionsMap.get(node).length; @@ -305,10 +309,12 @@ public class ClusterStateManager implements IClusterStateManager { return partitons.toArray(new ClusterPartition[] {}); } + @Override public synchronized boolean isMetadataNodeActive() { return metadataNodeActive; } + @Override public synchronized ObjectNode getClusterStateDescription() { ObjectMapper om = new ObjectMapper(); ObjectNode stateDescription = om.createObjectNode(); @@ -342,6 +348,7 @@ public class ClusterStateManager implements IClusterStateManager { return stateDescription; } + @Override public synchronized ObjectNode getClusterStateSummary() { ObjectMapper om = new ObjectMapper(); ObjectNode stateDescription = om.createObjectNode(); @@ -395,6 +402,7 @@ public class ClusterStateManager implements IClusterStateManager { } } + @Override public synchronized void removePending(String nodeId) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering intention to remove node id " + nodeId); @@ -406,6 +414,7 @@ public class ClusterStateManager implements IClusterStateManager { } } + @Override public synchronized boolean cancelRemovePending(String nodeId) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Deregistering intention to remove node id " + nodeId); @@ -423,8 +432,8 @@ public class ClusterStateManager implements IClusterStateManager { } private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) { - ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()) - .getConfigManager(); + ConfigManager configManager = + ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager(); for (Map.Entry<IOption, Object> entry : configuration.entrySet()) { if (entry.getKey().section() == Section.NC) { configManager.set(nodeId, entry.getKey(), entry.getValue());