Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2348
Change subject: [NO ISSUE][CLUS] Add Metadata Cluster Partition ...................................................................... [NO ISSUE][CLUS] Add Metadata Cluster Partition - user model changes: no - storage format changes: no - interface changes: yes Details: - Add a cluster partition reference to the cluster partition in which metadata is stored. This allows the initial metadata node to be removed from the cluster and another metadata node to be assigned to that metadata cluster partition. Initially, it is assigned to the first partition of the first metadata node. - Use metadata cluster partition in defining metadata datasets file splits instead of the assumption of the first partition on the initial metadata node. Change-Id: I2ac99252cacba92b4c4484c0d34cdc77fee307e8 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 4 files changed, 60 insertions(+), 33 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/48/2348/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java index 815d878..1d4a0a9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java @@ -40,8 +40,8 @@ } @Override - public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ((CcApplicationContext) appCtx).getNcLifecycleCoordinator().process(this); + public void handle(ICcApplicationContext appCtx) throws HyracksDataException { + appCtx.getNcLifecycleCoordinator().process(this); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 5b7e5a7..ba647b8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -92,7 +92,7 @@ /** * @return A copy of the current state of the cluster partitions. */ - ClusterPartition[] getClusterPartitons(); + ClusterPartition[] getClusterPartitions(); /** * Blocks until the cluster state becomes {@code state} @@ -139,8 +139,7 @@ /** * Returns the IO devices configured for a Node Controller * - * @param nodeId - * unique identifier of the Node Controller + * @param nodeId unique identifier of the Node Controller * @return a list of IO devices. */ String[] getIODevices(String nodeId); @@ -151,15 +150,13 @@ AlgebricksAbsolutePartitionConstraint getClusterLocations(); /** - * @param excludePendingRemoval - * true, if the desired set shouldn't have pending removal nodes + * @param excludePendingRemoval true, if the desired set shouldn't have pending removal nodes * @return the set of participant nodes */ Set<String> getParticipantNodes(boolean excludePendingRemoval); /** - * @param node - * the node id + * @param node the node id * @return the number of partitions on that node */ int getNodePartitionsCount(String node); @@ -224,4 +221,18 @@ boolean cancelRemovePending(String nodeId); Map<String, Map<IOption, Object>> getActiveNcConfiguration(); + + /** + * Sets the cluster partition in which metadata datasets stored + * + * @param partition + */ + void setMetadataPartitionId(ClusterPartition partition); + + /** + * Gets the cluster partition in which metadata datasets are stored + * + * @return The metadata cluster partitions + */ + ClusterPartition getMetadataPartition(); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java index 2a6d0e8..5fde7e6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.asterix.common.cluster.ClusterPartition; @@ -44,7 +46,7 @@ private static FileSplit[] getDataverseSplits(IClusterStateManager clusterStateManager, String dataverseName) { List<FileSplit> splits = new ArrayList<>(); // get all partitions - ClusterPartition[] clusterPartition = clusterStateManager.getClusterPartitons(); + ClusterPartition[] clusterPartition = clusterStateManager.getClusterPartitions(); for (int j = 0; j < clusterPartition.length; j++) { File f = new File(StoragePathUtil.prepareStoragePartitionPath(clusterPartition[j].getPartitionId()), dataverseName); @@ -71,20 +73,11 @@ String indexName, List<String> nodes) { final String relPath = StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), dataset.getDatasetName(), indexName, dataset.getRebalanceCount()); - List<FileSplit> splits = new ArrayList<>(); - for (String nd : nodes) { - int numPartitions = clusterStateManager.getNodePartitionsCount(nd); - ClusterPartition[] nodePartitions = clusterStateManager.getNodePartitions(nd); - // currently this case is never executed since the metadata group doesn't exists - if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) { - numPartitions = 1; - } - - for (int k = 0; k < numPartitions; k++) { - File f = new File(StoragePathUtil.prepareStoragePartitionPath(nodePartitions[k].getPartitionId()), - relPath); - splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], f.getPath())); - } + final List<ClusterPartition> datasetPartitions = getDatasetPartitions(clusterStateManager, dataset, nodes); + final List<FileSplit> splits = new ArrayList<>(); + for (ClusterPartition partition : datasetPartitions) { + File f = new File(StoragePathUtil.prepareStoragePartitionPath(partition.getPartitionId()), relPath); + splits.add(StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath())); } return splits.toArray(new FileSplit[] {}); } @@ -94,4 +87,17 @@ FileSplit[] splits = getDataverseSplits(clusterStateManager, dataverse); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } + + private static List<ClusterPartition> getDatasetPartitions(IClusterStateManager clusterStateManager, + Dataset dataset, List<String> nodes) { + if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) { + return Collections.singletonList(clusterStateManager.getMetadataPartition()); + } + final List<ClusterPartition> datasetPartitions = new ArrayList<>(); + for (String node : nodes) { + final ClusterPartition[] nodePartitions = clusterStateManager.getNodePartitions(node); + datasetPartitions.addAll(Arrays.asList(nodePartitions)); + } + return datasetPartitions; + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index c5eeb65..d864919 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -32,10 +32,10 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.INcLifecycleCoordinator; -import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -73,6 +73,7 @@ private Set<String> participantNodes = new HashSet<>(); private INcLifecycleCoordinator lifecycleCoordinator; private ICcApplicationContext appCtx; + private ClusterPartition metadataPartition; @Override public void setCcAppCtx(ICcApplicationContext appCtx) { @@ -80,6 +81,7 @@ node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions(); clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions(); currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName(); + metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0]; lifecycleCoordinator = appCtx.getNcLifecycleCoordinator(); lifecycleCoordinator.bindTo(this); } @@ -121,16 +123,18 @@ } @Override - public void updateMetadataNode(String nodeId, boolean active) { + public synchronized void updateMetadataNode(String nodeId, boolean active) { currentMetadataNode = nodeId; metadataNodeActive = active; if (active) { + metadataPartition.setActiveNodeId(currentMetadataNode); LOGGER.info(String.format("Metadata node %s is now active", currentMetadataNode)); } + notifyAll(); } @Override - public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException { + public synchronized void updateNodePartitions(String nodeId, boolean active) { if (active) { participantNodes.add(nodeId); } else { @@ -305,12 +309,8 @@ } @Override - public synchronized ClusterPartition[] getClusterPartitons() { - ArrayList<ClusterPartition> partitons = new ArrayList<>(); - for (ClusterPartition partition : clusterPartitions.values()) { - partitons.add(partition); - } - return partitons.toArray(new ClusterPartition[] {}); + public synchronized ClusterPartition[] getClusterPartitions() { + return clusterPartitions.values().toArray(new ClusterPartition[] {}); } @Override @@ -442,6 +442,16 @@ return new HashSet<>(pendingRemoval); } + @Override + public synchronized void setMetadataPartitionId(ClusterPartition partition) { + metadataPartition = partition; + } + + @Override + public synchronized ClusterPartition getMetadataPartition() { + return metadataPartition; + } + private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) { ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager(); -- To view, visit https://asterix-gerrit.ics.uci.edu/2348 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2ac99252cacba92b4c4484c0d34cdc77fee307e8 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>