This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new f400d33287 [ASTERIXDB-3144][RT] Implement Static Partitioning f400d33287 is described below commit f400d33287536d81408c1475e510c50bdd344c27 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Fri May 5 03:31:47 2023 +0300 [ASTERIXDB-3144][RT] Implement Static Partitioning - user model changes: no - storage format changes: no - interface changes: yes Details: - Implement static partitioning based on storage/compute partitions map. - Fixes for LSMPrimaryInsertOperatorNodePushable state keeping for working on multiple storage partitions. Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../asterix/app/cc/CcApplicationContext.java | 2 +- .../asterix/runtime/ClusterStateManagerTest.java | 11 ++ .../asterix/common/cluster/ComputePartition.java | 37 +++++++ .../common/cluster/SplitComputeLocations.java | 41 ++++++++ .../cluster/StorageComputePartitionsMap.java | 93 +++++++++++++++++ .../asterix/common/utils/StorageConstants.java | 2 + .../adapter/factory/GenericAdapterFactory.java | 1 + .../metadata/declared/MetadataProvider.java | 5 +- .../metadata/utils/DataPartitioningProvider.java | 90 +++++++--------- .../utils/DynamicDataPartitioningProvider.java | 57 ++++++++++ .../utils/StaticDataPartitioningProvider.java | 116 +++++++++++++++++++++ .../LSMPrimaryInsertOperatorNodePushable.java | 38 +++---- .../asterix/runtime/utils/ClusterStateManager.java | 20 +++- .../PersistentLocalResourceRepository.java | 2 +- 14 files changed, 440 insertions(+), 75 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java index a2d99a0332..e4247f0dd9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java @@ -157,7 +157,7 @@ public class CcApplicationContext implements ICcApplicationContext { requestTracker = new RequestTracker(this); configValidator = configValidatorFactory.create(); this.adapterFactoryService = adapterFactoryService; - dataPartitioningProvider = new DataPartitioningProvider(this); + dataPartitioningProvider = DataPartitioningProvider.create(this); } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index a2a3b48393..53b9294e2b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -33,8 +33,10 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.common.utils.PartitioningScheme; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.asterix.runtime.utils.BulkTxnIdFactory; @@ -231,6 +233,9 @@ public class ClusterStateManagerTest { MetadataProperties metadataProperties = mockMetadataProperties(); Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties); + StorageProperties storageProperties = mockStorageProperties(); + Mockito.when(ccApplicationContext.getStorageProperties()).thenReturn(storageProperties); + ResourceIdManager resourceIdManager = new ResourceIdManager(csm); Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager); @@ -258,6 +263,12 @@ public class ClusterStateManagerTest { return metadataProperties; } + private StorageProperties mockStorageProperties() { + StorageProperties storageProperties = Mockito.mock(StorageProperties.class); + Mockito.when(storageProperties.getPartitioningScheme()).thenReturn(PartitioningScheme.DYNAMIC); + return storageProperties; + } + private NcLocalCounters mockLocalCounters() { final NcLocalCounters localCounters = Mockito.mock(NcLocalCounters.class); Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java new file mode 100644 index 0000000000..1d11c302cc --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +public class ComputePartition { + private final String nodeId; + private final int id; + + public ComputePartition(int id, String nodeId) { + this.id = id; + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + public int getId() { + return id; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java new file mode 100644 index 0000000000..b58c39f66f --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class SplitComputeLocations { + + private final IFileSplitProvider splitsProvider; + private final AlgebricksPartitionConstraint constraints; + + public SplitComputeLocations(IFileSplitProvider splitsProvider, AlgebricksPartitionConstraint constraints) { + this.splitsProvider = splitsProvider; + this.constraints = constraints; + } + + public IFileSplitProvider getSplitsProvider() { + return splitsProvider; + } + + public AlgebricksPartitionConstraint getConstraints() { + return constraints; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java new file mode 100644 index 0000000000..874371e09a --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.utils.StorageConstants; + +public class StorageComputePartitionsMap { + + private final Map<Integer, ComputePartition> stoToComputeLocation = new HashMap<>(); + + public void addStoragePartition(int stoPart, ComputePartition compute) { + stoToComputeLocation.put(stoPart, compute); + } + + public int[][] getComputeToStorageMap(boolean metadataDataset) { + Map<Integer, List<Integer>> computeToStoragePartitions = new HashMap<>(); + if (metadataDataset) { + final int computePartitionIdForMetadata = 0; + computeToStoragePartitions.put(computePartitionIdForMetadata, + Collections.singletonList(computePartitionIdForMetadata)); + } else { + for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) { + ComputePartition computePartition = getComputePartition(i); + int computeId = computePartition.getId(); + List<Integer> storagePartitions = + computeToStoragePartitions.computeIfAbsent(computeId, k -> new ArrayList<>()); + storagePartitions.add(i); + } + } + int[][] computerToStoArray = new int[computeToStoragePartitions.size()][]; + for (Map.Entry<Integer, List<Integer>> integerListEntry : computeToStoragePartitions.entrySet()) { + computerToStoArray[integerListEntry.getKey()] = + integerListEntry.getValue().stream().mapToInt(i -> i).toArray(); + } + return computerToStoArray; + } + + public ComputePartition getComputePartition(int storagePartition) { + return stoToComputeLocation.get(storagePartition); + } + + public static StorageComputePartitionsMap computePartitionsMap(IClusterStateManager clusterStateManager) { + ClusterPartition metadataPartition = clusterStateManager.getMetadataPartition(); + Map<Integer, ClusterPartition> clusterPartitions = clusterStateManager.getClusterPartitions(); + StorageComputePartitionsMap newMap = new StorageComputePartitionsMap(); + newMap.addStoragePartition(metadataPartition.getPartitionId(), + new ComputePartition(metadataPartition.getPartitionId(), metadataPartition.getActiveNodeId())); + int storagePartitionsPerComputePartition = StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size(); + int storagePartitionId = 0; + int lastComputePartition = 1; + int remainingStoragePartition = StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size(); + for (Map.Entry<Integer, ClusterPartition> cp : clusterPartitions.entrySet()) { + ClusterPartition clusterPartition = cp.getValue(); + for (int i = 0; i < storagePartitionsPerComputePartition; i++) { + newMap.addStoragePartition(storagePartitionId, + new ComputePartition(clusterPartition.getPartitionId(), clusterPartition.getActiveNodeId())); + storagePartitionId++; + } + if (lastComputePartition == clusterPartitions.size() && remainingStoragePartition != 0) { + // assign all remaining partitions to last compute partition + for (int k = 0; k < remainingStoragePartition; k++) { + newMap.addStoragePartition(storagePartitionId, new ComputePartition( + clusterPartition.getPartitionId(), clusterPartition.getActiveNodeId())); + storagePartitionId++; + } + } + lastComputePartition++; + } + return newMap; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index 2d231d3fb8..c26fe76156 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -46,6 +46,8 @@ public class StorageConstants { public static final String DEFAULT_COMPACTION_POLICY_NAME = ConcurrentMergePolicyFactory.NAME; public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME = "correlated-prefix"; public static final Map<String, String> DEFAULT_COMPACTION_POLICY_PROPERTIES; + public static final int METADATA_PARTITION = -1; + public static final int NUM_STORAGE_PARTITIONS = 8; /** * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index 9ca87b873f..3ac8a0292e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -145,6 +145,7 @@ public class GenericAdapterFactory implements ITypedAdapterFactory { throws HyracksDataException, AlgebricksException { this.isFeed = ExternalDataUtils.isFeed(configuration); if (isFeed) { + //TODO(partitioning) make this code reuse DataPartitioningProvider feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDatasetDataverse(configuration), ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint()); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index a8df92aa2e..0b011d0c0d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -933,7 +933,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) throws AlgebricksException { - return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx, appCtx.getClusterStateManager()); + return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, dataset, indexName).getSpiltsProvider() + .getFileSplits(); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, @@ -1788,8 +1789,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public PartitioningProperties getPartitioningProperties(Dataset ds, String indexName) throws AlgebricksException { - //TODO(partitioning) pass splits rather than mdTxnCtx? - // FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName); return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java index ec4c985369..7257958f1d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java @@ -18,9 +18,6 @@ */ package org.apache.asterix.metadata.utils; -import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC; -import static org.apache.asterix.common.utils.PartitioningScheme.STATIC; - import java.util.Arrays; import java.util.Set; import java.util.TreeSet; @@ -31,78 +28,71 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.IDataPartitioningProvider; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -public class DataPartitioningProvider implements IDataPartitioningProvider { +public abstract class DataPartitioningProvider implements IDataPartitioningProvider { - private final ICcApplicationContext appCtx; - private final PartitioningScheme scheme; + protected final ICcApplicationContext appCtx; + protected final ClusterStateManager clusterStateManager; - public DataPartitioningProvider(ICcApplicationContext appCtx) { + DataPartitioningProvider(ICcApplicationContext appCtx) { this.appCtx = appCtx; - scheme = appCtx.getStorageProperties().getPartitioningScheme(); + this.clusterStateManager = (ClusterStateManager) appCtx.getClusterStateManager(); } - public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) { - if (scheme == DYNAMIC) { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil - .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName); - int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second)); - return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); - } else if (scheme == STATIC) { - throw new NotImplementedException(); + public static DataPartitioningProvider create(ICcApplicationContext appCtx) { + PartitioningScheme partitioningScheme = appCtx.getStorageProperties().getPartitioningScheme(); + switch (partitioningScheme) { + case DYNAMIC: + return new DynamicDataPartitioningProvider(appCtx); + case STATIC: + return new StaticDataPartitioningProvider(appCtx); + default: + throw new IllegalStateException("unknown partitioning scheme: " + partitioningScheme); } - throw new IllegalStateException(); } - public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds, - String indexName) throws AlgebricksException { - if (scheme == DYNAMIC) { - FileSplit[] splits = - SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, mdTxnCtx, appCtx.getClusterStateManager()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = - StoragePathUtil.splitProviderAndPartitionConstraints(splits); - int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second)); - return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); - } else if (scheme == STATIC) { - throw new NotImplementedException(); - } - throw new IllegalStateException(); - } + public abstract PartitioningProperties getPartitioningProperties(DataverseName dataverseName); + + public abstract PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds, + String indexName) throws AlgebricksException; public PartitioningProperties getPartitioningProperties(Feed feed) throws AsterixException { - if (scheme == DYNAMIC) { - IClusterStateManager csm = appCtx.getClusterStateManager(); - AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations(); - Set<String> nodes = new TreeSet<>(Arrays.asList(allCluster.getLocations())); - AlgebricksAbsolutePartitionConstraint locations = - new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0])); - FileSplit[] feedLogFileSplits = - FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC = - StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); - int[][] partitionsMap = getPartitionsMap(getNumPartitions(spC.second)); - return PartitioningProperties.of(spC.first, spC.second, partitionsMap); - } else if (scheme == STATIC) { - throw new NotImplementedException(); - } - throw new IllegalStateException(); + IClusterStateManager csm = appCtx.getClusterStateManager(); + AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations(); + Set<String> nodes = new TreeSet<>(Arrays.asList(allCluster.getLocations())); + AlgebricksAbsolutePartitionConstraint locations = + new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0])); + FileSplit[] feedLogFileSplits = + FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC = + StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); + int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(spC.second)); + return PartitioningProperties.of(spC.first, spC.second, partitionsMap); + } + + protected static int getNumberOfPartitions(Dataset ds) { + return MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()) + ? MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS + : StorageConstants.NUM_STORAGE_PARTITIONS; } - private static int getNumPartitions(AlgebricksPartitionConstraint constraint) { + protected static int getLocationsCount(AlgebricksPartitionConstraint constraint) { if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) { return ((AlgebricksCountPartitionConstraint) constraint).getCount(); } else { @@ -110,7 +100,7 @@ public class DataPartitioningProvider implements IDataPartitioningProvider { } } - private static int[][] getPartitionsMap(int numPartitions) { + protected static int[][] getOneToOnePartitionsMap(int numPartitions) { int[][] map = new int[numPartitions][1]; for (int i = 0; i < numPartitions; i++) { map[i] = new int[] { i }; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java new file mode 100644 index 0000000000..95dae4a267 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import org.apache.asterix.common.cluster.PartitioningProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class DynamicDataPartitioningProvider extends DataPartitioningProvider { + + public DynamicDataPartitioningProvider(ICcApplicationContext appCtx) { + super(appCtx); + } + + @Override + public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil + .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName); + int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second)); + return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); + } + + @Override + public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds, + String indexName) throws AlgebricksException { + FileSplit[] splits = + SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, mdTxnCtx, appCtx.getClusterStateManager()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = + StoragePathUtil.splitProviderAndPartitionConstraints(splits); + int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second)); + return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java new file mode 100644 index 0000000000..eaafc6ce57 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.asterix.common.cluster.ComputePartition; +import org.apache.asterix.common.cluster.PartitioningProperties; +import org.apache.asterix.common.cluster.SplitComputeLocations; +import org.apache.asterix.common.cluster.StorageComputePartitionsMap; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.io.MappedFileSplit; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class StaticDataPartitioningProvider extends DataPartitioningProvider { + + public StaticDataPartitioningProvider(ICcApplicationContext appCtx) { + super(appCtx); + } + + @Override + public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) { + SplitComputeLocations dataverseSplits = getDataverseSplits(dataverseName); + StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap(); + int[][] partitionsMap = partitionMap.getComputeToStorageMap(false); + return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(), + partitionsMap); + } + + @Override + public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds, + String indexName) throws AlgebricksException { + SplitComputeLocations dataverseSplits = getDatasetSplits(ds, indexName); + StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap(); + int[][] partitionsMap = partitionMap + .getComputeToStorageMap(MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId())); + return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(), + partitionsMap); + } + + private SplitComputeLocations getDataverseSplits(DataverseName dataverseName) { + List<FileSplit> splits = new ArrayList<>(); + List<String> locations = new ArrayList<>(); + Set<Integer> uniqueLocations = new HashSet<>(); + StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap(); + for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) { + File f = new File(StoragePathUtil.prepareStoragePartitionPath(i), + StoragePathUtil.prepareDataverseName(dataverseName)); + ComputePartition computePartition = partitionMap.getComputePartition(i); + splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0)); + if (!uniqueLocations.contains(computePartition.getId())) { + locations.add(computePartition.getNodeId()); + } + uniqueLocations.add(computePartition.getId()); + } + IFileSplitProvider splitProvider = StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0])); + AlgebricksPartitionConstraint constraints = + new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); + return new SplitComputeLocations(splitProvider, constraints); + } + + private SplitComputeLocations getDatasetSplits(Dataset dataset, String indexName) { + List<FileSplit> splits = new ArrayList<>(); + List<String> locations = new ArrayList<>(); + Set<Integer> uniqueLocations = new HashSet<>(); + StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap(); + final int datasetPartitons = getNumberOfPartitions(dataset); + boolean metadataDataset = MetadataIndexImmutableProperties.isMetadataDataset(dataset.getDatasetId()); + for (int i = 0; i < datasetPartitons; i++) { + int storagePartition = metadataDataset ? StorageConstants.METADATA_PARTITION : i; + final String relPath = StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), + dataset.getDatasetName(), indexName, dataset.getRebalanceCount()); + File f = new File(StoragePathUtil.prepareStoragePartitionPath(storagePartition), relPath); + ComputePartition computePartition = partitionMap.getComputePartition(storagePartition); + splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0)); + if (!uniqueLocations.contains(computePartition.getId())) { + locations.add(computePartition.getNodeId()); + } + uniqueLocations.add(computePartition.getId()); + } + IFileSplitProvider splitProvider = StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0])); + AlgebricksPartitionConstraint constraints = + new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0])); + return new SplitComputeLocations(splitProvider, constraints); + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java index 7e51ec16db..3762e82991 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java @@ -65,6 +65,8 @@ import org.apache.hyracks.storage.common.MultiComparator; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; @@ -82,11 +84,10 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe private final IFrameOperationCallback[] frameOpCallbacks; private boolean flushedPartialTuples; - private int currentTupleIdx; - private int lastFlushedTupleIdx; - private final PermutingFrameTupleReference keyTuple; private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>(); + private final IntSet processedTuples = new IntOpenHashSet(); + private final IntSet flushedTuples = new IntOpenHashSet(); private final SourceLocation sourceLoc; public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, @@ -142,7 +143,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe @Override public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index) throws HyracksDataException { - if (index < currentTupleIdx) { + if (processedTuples.contains(index)) { // already processed; skip return; } @@ -174,7 +175,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index); } - currentTupleIdx = index + 1; + processedTuples.add(index); } @Override @@ -197,15 +198,14 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe @Override public void open() throws HyracksDataException { - currentTupleIdx = 0; - lastFlushedTupleIdx = 0; flushedPartialTuples = false; accessor = new FrameTupleAccessor(inputRecDesc); writeBuffer = new VSizeFrame(ctx); try { INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); - + writer.open(); + writerOpen = true; for (int i = 0; i < partitions.length; i++) { IIndexDataflowHelper indexHelper = indexHelpers[i]; indexHelper.open(); @@ -224,8 +224,6 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]); TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx); } - writer.open(); - writerOpen = true; modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this); searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory @@ -283,9 +281,9 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer()); FrameUtils.flushFrame(writeBuffer.getBuffer(), writer); } - currentTupleIdx = 0; - lastFlushedTupleIdx = 0; flushedPartialTuples = false; + processedTuples.clear(); + flushedTuples.clear(); } /** @@ -293,15 +291,17 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe */ @Override public void flushPartialFrame() throws HyracksDataException { - if (lastFlushedTupleIdx == currentTupleIdx) { - //nothing to flush - return; - } - for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) { - FrameUtils.appendToWriter(writer, appender, accessor, i); + IntList tuplesToFlush = new IntArrayList(); + processedTuples.iterator().forEachRemaining(tIdx -> { + if (!flushedTuples.contains(tIdx)) { + tuplesToFlush.add(tIdx); + flushedTuples.add(tIdx); + } + }); + for (int i = 0; i < tuplesToFlush.size(); i++) { + FrameUtils.appendToWriter(writer, appender, accessor, tuplesToFlush.getInt(i)); } appender.write(writer, true); - lastFlushedTupleIdx = currentTupleIdx; flushedPartialTuples = true; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index d3c87ff341..984b3ce22e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -34,12 +34,15 @@ import java.util.function.Predicate; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.cluster.StorageComputePartitionsMap; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.config.IOption; @@ -79,6 +82,7 @@ public class ClusterStateManager implements IClusterStateManager { private ICcApplicationContext appCtx; private ClusterPartition metadataPartition; private boolean rebalanceRequired; + private StorageComputePartitionsMap storageComputePartitionsMap; @Override public void setCcAppCtx(ICcApplicationContext appCtx) { @@ -86,7 +90,14 @@ public class ClusterStateManager implements IClusterStateManager { node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions(); clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions(); currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName(); - metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0]; + PartitioningScheme partitioningScheme = appCtx.getStorageProperties().getPartitioningScheme(); + if (partitioningScheme == PartitioningScheme.DYNAMIC) { + metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0]; + } else { + final ClusterPartition fixedMetadataPartition = new ClusterPartition(StorageConstants.METADATA_PARTITION, + appCtx.getMetadataProperties().getMetadataNodeName(), 0); + metadataPartition = fixedMetadataPartition; + } lifecycleCoordinator = appCtx.getNcLifecycleCoordinator(); lifecycleCoordinator.bindTo(this); } @@ -299,6 +310,9 @@ public class ClusterStateManager implements IClusterStateManager { clusterActiveLocations.removeAll(pendingRemoval); clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); + if (appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC) { + storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this); + } } @Override @@ -489,6 +503,10 @@ public class ClusterStateManager implements IClusterStateManager { return nodeIds.stream().anyMatch(failedNodes::contains); } + public synchronized StorageComputePartitionsMap getStorageComputeMap() { + return storageComputePartitionsMap; + } + private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) { final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); resourceIdManager.report(nodeId, localCounters.getMaxResourceId()); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index bb3cde5944..1e813462b3 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -211,7 +211,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito public void delete(String relativePath) throws HyracksDataException { FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath); boolean resourceExists = resourceFile.getFile().exists(); - if (resourceExists) { + if (isReplicationEnabled && resourceExists) { try { createReplicationJob(ReplicationOperation.DELETE, resourceFile); } catch (Exception e) {