>From Murtadha Hubail <[email protected]>: Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509 )
Change subject: WIP: Static bucketing ...................................................................... WIP: Static bucketing Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e --- A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java M asterixdb/asterix-app/src/test/resources/cc.conf M asterixdb/asterix-app/src/main/resources/cc.conf A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 10 files changed, 286 insertions(+), 10 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/09/17509/1 diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf index dc6e5a2..cbc051b 100644 --- a/asterixdb/asterix-app/src/main/resources/cc.conf +++ b/asterixdb/asterix-app/src/main/resources/cc.conf @@ -65,3 +65,4 @@ compiler.internal.sanitycheck=true messaging.frame.size=4096 messaging.frame.count=512 +storage.partitioning=static diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf b/asterixdb/asterix-app/src/test/resources/cc.conf index 953284964..ec7566a 100644 --- a/asterixdb/asterix-app/src/test/resources/cc.conf +++ b/asterixdb/asterix-app/src/test/resources/cc.conf @@ -61,3 +61,4 @@ compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 +storage.partitioning=static diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java new file mode 100644 index 0000000..1d11c30 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +public class ComputePartition { + private final String nodeId; + private final int id; + + public ComputePartition(int id, String nodeId) { + this.id = id; + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + public int getId() { + return id; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java new file mode 100644 index 0000000..b58c39f --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class SplitComputeLocations { + + private final IFileSplitProvider splitsProvider; + private final AlgebricksPartitionConstraint constraints; + + public SplitComputeLocations(IFileSplitProvider splitsProvider, AlgebricksPartitionConstraint constraints) { + this.splitsProvider = splitsProvider; + this.constraints = constraints; + } + + public IFileSplitProvider getSplitsProvider() { + return splitsProvider; + } + + public AlgebricksPartitionConstraint getConstraints() { + return constraints; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java new file mode 100644 index 0000000..a8029b4 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.utils.StorageConstants; + +public class StoragePartitionsMap { + + private Map<Integer, ComputePartition> stoToComputeLocation = new HashMap<>(); + + public void addStoragePartition(int stoPart, ComputePartition compute){ + stoToComputeLocation.put(stoPart, compute); + } + + public int[][] getComputeToStorageMap(boolean metadataDataset) { + Map<Integer, List<Integer>> computeToStoragePartitions = new HashMap<>(); + if (metadataDataset) { + final int computePartitionIdForMetadata = 0; + computeToStoragePartitions.put(computePartitionIdForMetadata, + Collections.singletonList(computePartitionIdForMetadata)); + } else { + for(int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++){ + ComputePartition computePartition = getComputePartition(i); + int computeId = computePartition.getId(); + List<Integer> storagePartitions = computeToStoragePartitions.computeIfAbsent(computeId, k -> new ArrayList<>()); + storagePartitions.add(i); + } + } + int[][] computerToStoArray = new int[computeToStoragePartitions.size()][]; + for (Map.Entry<Integer, List<Integer>> integerListEntry : computeToStoragePartitions.entrySet()) { + computerToStoArray[integerListEntry.getKey()] = + integerListEntry.getValue().stream().mapToInt(i -> i).toArray(); + } + return computerToStoArray; + } + + public synchronized ComputePartition getComputePartition(int storagePartition) { + return stoToComputeLocation.get(storagePartition); + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index 2d231d3..84b8b6d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -46,6 +46,8 @@ public static final String DEFAULT_COMPACTION_POLICY_NAME = ConcurrentMergePolicyFactory.NAME; public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME = "correlated-prefix"; public static final Map<String, String> DEFAULT_COMPACTION_POLICY_PROPERTIES; + public static final int METADATA_PARTITION = -1; + public static final int NUM_STORAGE_PARTITIONS = 16; /** * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index a8df92a..9a6abec 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -933,7 +933,7 @@ public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) throws AlgebricksException { - return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx, appCtx.getClusterStateManager()); + return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, dataset, indexName).getSpiltsProvider().getFileSplits(); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java index ec4c985..d8509e9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java @@ -20,23 +20,34 @@ import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC; import static org.apache.asterix.common.utils.PartitioningScheme.STATIC; +import static org.apache.asterix.metadata.utils.MetadataConstants.METADATA_NODEGROUP_NAME; +import java.io.File; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.TreeSet; +import org.apache.asterix.common.cluster.ComputePartition; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.PartitioningProperties; +import org.apache.asterix.common.cluster.SplitComputeLocations; +import org.apache.asterix.common.cluster.StoragePartitionsMap; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.IDataPartitioningProvider; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -44,30 +55,87 @@ import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.io.MappedFileSplit; +import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; public class DataPartitioningProvider implements IDataPartitioningProvider { private final ICcApplicationContext appCtx; private final PartitioningScheme scheme; + private final ClusterStateManager clusterStateManager; public DataPartitioningProvider(ICcApplicationContext appCtx) { this.appCtx = appCtx; + this.clusterStateManager = (ClusterStateManager) appCtx.getClusterStateManager(); scheme = appCtx.getStorageProperties().getPartitioningScheme(); } public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) { if (scheme == DYNAMIC) { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil - .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = + SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), + dataverseName); int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second)); return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); } else if (scheme == STATIC) { - throw new NotImplementedException(); + SplitComputeLocations dataverseSplits = getDataverseSplits(dataverseName); + StoragePartitionsMap partitionMap = clusterStateManager.getPartitionMap(); + int[][] partitionsMap = partitionMap.getComputeToStorageMap(false); + return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(), + partitionsMap); } throw new IllegalStateException(); } + private SplitComputeLocations getDataverseSplits(DataverseName dataverseName) { + List<FileSplit> splits = new ArrayList<>(); + List<String> locations = new ArrayList<>(); + Set<Integer> uniqueLocations = new HashSet<>(); + StoragePartitionsMap partitionMap = clusterStateManager.getPartitionMap(); + for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) { + File f = new File(StoragePathUtil.prepareStoragePartitionPath(i), + StoragePathUtil.prepareDataverseName(dataverseName)); + ComputePartition computePartition = partitionMap.getComputePartition(i); + splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0)); + if (!uniqueLocations.contains(computePartition.getId())) { + locations.add(computePartition.getNodeId()); + } + uniqueLocations.add(computePartition.getId()); + } + IFileSplitProvider splitProvider = StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0])); + AlgebricksPartitionConstraint constraints = + new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); + return new SplitComputeLocations(splitProvider, constraints); + } + + private SplitComputeLocations getDatasetSplits(Dataset dataset, String indexName) { + List<FileSplit> splits = new ArrayList<>(); + List<String> locations = new ArrayList<>(); + Set<Integer> uniqueLocations = new HashSet<>(); + StoragePartitionsMap partitionMap = clusterStateManager.getPartitionMap(); + + final int datasetPartitons = getNumberOfPartitions(dataset); + boolean metadataDataset = MetadataIndexImmutableProperties.isMetadataDataset(dataset.getDatasetId()); + for (int i = 0; i < datasetPartitons; i++) { + int storagePartition = metadataDataset ? StorageConstants.METADATA_PARTITION : i; + final String relPath = + StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), dataset.getDatasetName(), + indexName, dataset.getRebalanceCount()); + File f = new File(StoragePathUtil.prepareStoragePartitionPath(storagePartition), relPath); + ComputePartition computePartition = partitionMap.getComputePartition(storagePartition); + splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0)); + if (!uniqueLocations.contains(computePartition.getId())) { + locations.add(computePartition.getNodeId()); + } + uniqueLocations.add(computePartition.getId()); + } + IFileSplitProvider splitProvider = StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0])); + AlgebricksPartitionConstraint constraints = + new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); + return new SplitComputeLocations(splitProvider, constraints); + } + public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds, String indexName) throws AlgebricksException { if (scheme == DYNAMIC) { @@ -78,7 +146,12 @@ int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second)); return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); } else if (scheme == STATIC) { - throw new NotImplementedException(); + SplitComputeLocations dataverseSplits = getDatasetSplits(ds, indexName); + StoragePartitionsMap partitionMap = clusterStateManager.getPartitionMap(); + int[][] partitionsMap = partitionMap.getComputeToStorageMap( + MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId())); + return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(), + partitionsMap); } throw new IllegalStateException(); } @@ -97,7 +170,7 @@ int[][] partitionsMap = getPartitionsMap(getNumPartitions(spC.second)); return PartitioningProperties.of(spC.first, spC.second, partitionsMap); } else if (scheme == STATIC) { - throw new NotImplementedException(); + return getPartitioningProperties(feed.getDataverseName()); } throw new IllegalStateException(); } @@ -117,4 +190,10 @@ } return map; } + + private static int getNumberOfPartitions(Dataset ds) { + return MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()) ? + MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS : + StorageConstants.NUM_STORAGE_PARTITIONS; + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java index 7e51ec1..e26b436 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java @@ -205,7 +205,8 @@ try { INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); - + writer.open(); + writerOpen = true; for (int i = 0; i < partitions.length; i++) { IIndexDataflowHelper indexHelper = indexHelpers[i]; indexHelper.open(); @@ -224,8 +225,6 @@ new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]); TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx); } - writer.open(); - writerOpen = true; modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this); searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index d3c87ff..fda0d6a 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -33,13 +33,17 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.ComputePartition; import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.cluster.StoragePartitionsMap; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.config.IOption; @@ -79,6 +83,10 @@ private ICcApplicationContext appCtx; private ClusterPartition metadataPartition; private boolean rebalanceRequired; + private StoragePartitionsMap partitionMap; + public synchronized StoragePartitionsMap getPartitionMap() { + return partitionMap; + } @Override public void setCcAppCtx(ICcApplicationContext appCtx) { @@ -86,7 +94,15 @@ node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions(); clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions(); currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName(); - metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0]; + + PartitioningScheme partitioningScheme = appCtx.getStorageProperties().getPartitioningScheme(); + if (partitioningScheme == PartitioningScheme.DYNAMIC) { + metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0]; + } else { + final ClusterPartition fixedMetadataPartition = new ClusterPartition(StorageConstants.METADATA_PARTITION, + appCtx.getMetadataProperties().getMetadataNodeName(), 0); + metadataPartition = fixedMetadataPartition; + } lifecycleCoordinator = appCtx.getNcLifecycleCoordinator(); lifecycleCoordinator.bindTo(this); } @@ -299,6 +315,7 @@ clusterActiveLocations.removeAll(pendingRemoval); clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); + computePartitionsMap(); } @Override @@ -489,6 +506,34 @@ return nodeIds.stream().anyMatch(failedNodes::contains); } + private synchronized void computePartitionsMap() { + StoragePartitionsMap newMap = new StoragePartitionsMap(); + newMap.addStoragePartition(metadataPartition.getPartitionId(), + new ComputePartition(metadataPartition.getPartitionId(), metadataPartition.getActiveNodeId())); + int storagePartitionsPerComputePartition = StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size(); + int storagePartitionId = 0; + int lastComputePartition = 1; + int remainingStoragePartition = StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size(); + for (Map.Entry<Integer, ClusterPartition> cp : clusterPartitions.entrySet()) { + ClusterPartition clusterPartition = cp.getValue(); + for (int i = 0; i < storagePartitionsPerComputePartition; i++) { + newMap.addStoragePartition(storagePartitionId, + new ComputePartition(clusterPartition.getPartitionId(), clusterPartition.getActiveNodeId())); + storagePartitionId++; + } + if (lastComputePartition == clusterPartitions.size() && remainingStoragePartition != 0) { + // assign all remaining partitions to last compute partition + for (int k = 0; k < remainingStoragePartition; k++) { + newMap.addStoragePartition(storagePartitionId, new ComputePartition(clusterPartition.getPartitionId(), + clusterPartition.getActiveNodeId())); + storagePartitionId++; + } + } + lastComputePartition++; + } + partitionMap = newMap; + } + private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) { final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); resourceIdManager.report(nodeId, localCounters.getMaxResourceId()); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e Gerrit-Change-Number: 17509 Gerrit-PatchSet: 1 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-MessageType: newchange
