>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976 )
Change subject: [ASTERIXDB-3318][RUN] Create index builder factory per partition ...................................................................... [ASTERIXDB-3318][RUN] Create index builder factory per partition - user model changes: no - storage format changes: no - interface changes: no Details: LSMIndexIOOperationCallbackFactory factory is currently shared among index builder factories. When index builders run concurrently and without synchronization, this causes an issue because LSMIndexIOOperationCallbackFactory is stateful. Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java 6 files changed, 78 insertions(+), 36 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved; Verified Ali Alsuliman: Looks good to me, but someone else must approve diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 93802c2..43f40eb 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -339,7 +339,8 @@ JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); - FileSplit[] fs = partitioningProperties.getSplitsProvider().getFileSplits(); + IFileSplitProvider splitsProvider = partitioningProperties.getSplitsProvider(); + FileSplit[] fs = splitsProvider.getFileSplits(); StringBuilder sb = new StringBuilder(); for (FileSplit f : fs) { sb.append(f).append(" "); @@ -349,19 +350,37 @@ DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); // prepare a LocalResourceMetadata which will be stored in NC's local resource // repository - IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType, - compactionInfo.first, compactionInfo.second); - IndexBuilderFactory indexBuilderFactory = - new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), - partitioningProperties.getSplitsProvider(), resourceFactory, true); - IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, - partitioningProperties.getComputeStorageMap()); + int[][] computeStorageMap = partitioningProperties.getComputeStorageMap(); + IndexBuilderFactory[][] indexBuilderFactories = getIndexBuilderFactories(dataset, metadataProvider, index, + itemType, metaItemType, splitsProvider, compactionInfo.first, compactionInfo.second, computeStorageMap); + IndexCreateOperatorDescriptor indexCreateOp = + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitioningProperties.getConstraints()); spec.addRoot(indexCreateOp); return spec; } + public static IndexBuilderFactory[][] getIndexBuilderFactories(Dataset dataset, MetadataProvider metadataProvider, + Index index, ARecordType itemType, ARecordType metaItemType, IFileSplitProvider fileSplitProvider, + ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, + int[][] computeStorageMap) throws AlgebricksException { + IndexBuilderFactory[][] indexBuilderFactories = new IndexBuilderFactory[computeStorageMap.length][]; + for (int i = 0; i < computeStorageMap.length; i++) { + int len = computeStorageMap[i].length; + indexBuilderFactories[i] = new IndexBuilderFactory[len]; + for (int k = 0; k < len; k++) { + IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, + metaItemType, mergePolicyFactory, mergePolicyProperties); + IndexBuilderFactory indexBuilderFactory = + new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + fileSplitProvider, resourceFactory, true); + indexBuilderFactories[i][k] = indexBuilderFactory; + } + } + return indexBuilderFactories; + } + public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName, MetadataProvider metadataProvider) throws AlgebricksException { DataverseName dataverseName = dataverse.getDataverseName(); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java index 3dd84ec..e874663 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java @@ -27,7 +27,6 @@ import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.OptimizationConfUtil; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -81,14 +80,12 @@ import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor; -import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory; import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.apache.hyracks.storage.common.IResourceFactory; import org.apache.hyracks.storage.common.IStorageManager; import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory; @@ -158,13 +155,11 @@ @Override public JobSpecification buildCreationJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType, - mergePolicyFactory, mergePolicyProperties); - IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(), - fileSplitProvider, resourceFactory, true); + IndexBuilderFactory[][] indexBuilderFactories = + DatasetUtil.getIndexBuilderFactories(dataset, metadataProvider, sampleIdx, itemType, metaType, + fileSplitProvider, mergePolicyFactory, mergePolicyProperties, computeStorageMap); IndexCreateOperatorDescriptor indexCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, computeStorageMap); + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap); indexCreateOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint); spec.addRoot(indexCreateOp); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java index 0cda625..62352bf 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.asterix.common.cluster.PartitioningProperties; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; @@ -34,14 +33,12 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory; import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import org.apache.hyracks.storage.common.IResourceFactory; public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexOperationsHelper { @@ -53,14 +50,13 @@ @Override public JobSpecification buildCreationJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); - IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaType, - mergePolicyFactory, mergePolicyProperties); - IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(), - secondaryFileSplitProvider, resourceFactory, true); PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); - IndexCreateOperatorDescriptor secondaryIndexCreateOp = new IndexCreateOperatorDescriptor(spec, - indexBuilderFactory, partitioningProperties.getComputeStorageMap()); + int[][] computeStorageMap = partitioningProperties.getComputeStorageMap(); + IndexBuilderFactory[][] indexBuilderFactories = + DatasetUtil.getIndexBuilderFactories(dataset, metadataProvider, index, itemType, metaType, + secondaryFileSplitProvider, mergePolicyFactory, mergePolicyProperties, computeStorageMap); + IndexCreateOperatorDescriptor secondaryIndexCreateOp = + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, computeStorageMap); secondaryIndexCreateOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, secondaryPartitionConstraint); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java index d90a212..4948a66 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java @@ -120,8 +120,10 @@ IResourceFactory primaryResourceFactory = createPrimaryResourceFactory(); IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false); + IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1]; + indexBuilderFactories[0][0] = indexBuilderFactory; IndexCreateOperatorDescriptor primaryCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1)); + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1)); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID); spec.addRoot(primaryCreateOp); runTest(spec); @@ -168,8 +170,10 @@ IResourceFactory secondaryResourceFactory = createSecondaryResourceFactory(); IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false); + IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1]; + indexBuilderFactories[0][0] = indexBuilderFactory; IndexCreateOperatorDescriptor secondaryCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1)); + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1)); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID); spec.addRoot(secondaryCreateOp); runTest(spec); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java index 1634b2d..a8f2ef1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java @@ -242,8 +242,10 @@ pageManagerFactory, null, null); IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false); + IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1]; + indexBuilderFactories[0][0] = indexBuilderFactory; IndexCreateOperatorDescriptor primaryCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1)); + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1)); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID); spec.addRoot(primaryCreateOp); runTest(spec); @@ -305,8 +307,10 @@ JobSpecification spec = new JobSpecification(); IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false); + IIndexBuilderFactory[][] indexBuilderFactories = new IIndexBuilderFactory[1][1]; + indexBuilderFactories[0][0] = indexBuilderFactory; IndexCreateOperatorDescriptor secondaryCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1)); + new IndexCreateOperatorDescriptor(spec, indexBuilderFactories, TestUtil.getPartitionsMap(1)); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID); spec.addRoot(secondaryCreateOp); runTest(spec); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java index 61b600a..32636e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java @@ -31,13 +31,13 @@ public class IndexCreateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 2L; - private final IIndexBuilderFactory indexBuilderFactory; + private final IIndexBuilderFactory[][] indexBuilderFactories; private final int[][] partitionsMap; - public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory, - int[][] partitionsMap) { + public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, + IIndexBuilderFactory[][] indexBuilderFactories, int[][] partitionsMap) { super(spec, 0, 0); - this.indexBuilderFactory = indexBuilderFactory; + this.indexBuilderFactories = indexBuilderFactories; this.partitionsMap = partitionsMap; } @@ -45,9 +45,10 @@ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { int[] storagePartitions = partitionsMap[partition]; + IIndexBuilderFactory[] partitionIndexBuilderFactories = indexBuilderFactories[partition]; IIndexBuilder[] indexBuilders = new IIndexBuilder[storagePartitions.length]; for (int i = 0; i < storagePartitions.length; i++) { - indexBuilders[i] = indexBuilderFactory.create(ctx, storagePartitions[i]); + indexBuilders[i] = partitionIndexBuilderFactories[i].create(ctx, storagePartitions[i]); } return new IndexCreateOperatorNodePushable(indexBuilders); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda Gerrit-Change-Number: 17976 Gerrit-PatchSet: 3 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
