This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new de51cc4799 [ASTERIXDB-3144][RT] Run correlated index bulkload at storage parallelism de51cc4799 is described below commit de51cc479920e26e88f041d8c5d064764f788a42 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Fri May 5 21:22:32 2023 +0300 [ASTERIXDB-3144][RT] Run correlated index bulkload at storage parallelism - user model changes: no - storage format changes: no - interface changes: no Details: - To ensure correctness, always run correlated secondary index bulkload jobs using storage parallelism regardless of the number of compute partitions. This will ensure that each storage partition will produce the corresponding secondary index components. Change-Id: I5f38e4b06bcd91479bae544619bf3a96dde3c500 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17512 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../apache/asterix/metadata/utils/DatasetUtil.java | 15 ++++++++++-- .../SecondaryCorrelatedBTreeOperationsHelper.java | 3 ++- ...aryCorrelatedInvertedIndexOperationsHelper.java | 3 ++- .../SecondaryCorrelatedRTreeOperationsHelper.java | 3 ++- .../utils/SecondaryIndexOperationsHelper.java | 27 ++++++++++++++++++---- 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 130e39c357..e882129823 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -585,7 +585,20 @@ public class DatasetUtil { MetadataProvider metadataProvider) throws AlgebricksException { PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints(); + IOperatorDescriptor dummyKeyProviderOp = createDummyKeyProviderOp(spec); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, dummyKeyProviderOp, + primaryPartitionConstraint); + return dummyKeyProviderOp; + } + public static IOperatorDescriptor createCorrelatedDummyKeyProviderOp(JobSpecification spec, + AlgebricksPartitionConstraint apc) throws AlgebricksException { + IOperatorDescriptor dummyKeyProviderOp = createDummyKeyProviderOp(spec); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, dummyKeyProviderOp, apc); + return dummyKeyProviderOp; + } + + private static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException { // Build dummy tuple containing one field with a dummy value inside. ArrayTupleBuilder tb = new ArrayTupleBuilder(1); DataOutput dos = tb.getDataOutput(); @@ -602,8 +615,6 @@ public class DatasetUtil { RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers); ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp, - primaryPartitionConstraint); return keyProviderOp; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java index c59613781b..2b948ef623 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java @@ -75,7 +75,8 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends SecondaryCorrelate IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. - IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); + IOperatorDescriptor keyProviderOp = + DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, primaryPartitionConstraint); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java index 00cc5950b3..cd3f01c85d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java @@ -216,7 +216,8 @@ public class SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryC IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. - IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); + IOperatorDescriptor keyProviderOp = + DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, primaryPartitionConstraint); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java index e26aab3731..302ad74a01 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java @@ -188,7 +188,8 @@ public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelate assert dataset.getDatasetType() == DatasetType.INTERNAL; // Create dummy key provider for feeding the primary index scan. - IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); + IOperatorDescriptor keyProviderOp = + DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, primaryPartitionConstraint); IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 5d6c13c792..1a58423246 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; @@ -74,6 +75,7 @@ import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; @@ -211,8 +213,9 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); metaSerde = metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); - PartitioningProperties partitioningProperties = - metadataProvider.getPartitioningProperties(dataset, index.getIndexName()); + PartitioningProperties partitioningProperties; + partitioningProperties = + getSecondaryIndexBulkloadPartitioningProperties(metadataProvider, dataset, index.getIndexName()); secondaryFileSplitProvider = partitioningProperties.getSpiltsProvider(); secondaryPartitionConstraint = partitioningProperties.getConstraints(); numPrimaryKeys = dataset.getPrimaryKeys().size(); @@ -223,8 +226,8 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO } else { numFilterFields = 0; } - - PartitioningProperties datasetPartitioningProperties = metadataProvider.getPartitioningProperties(dataset); + PartitioningProperties datasetPartitioningProperties = getSecondaryIndexBulkloadPartitioningProperties( + metadataProvider, dataset, dataset.getDatasetName()); primaryFileSplitProvider = datasetPartitioningProperties.getSpiltsProvider(); primaryPartitionConstraint = datasetPartitioningProperties.getConstraints(); setPrimaryRecDescAndComparators(); @@ -527,4 +530,20 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO public AlgebricksPartitionConstraint getSecondaryPartitionConstraint() { return secondaryPartitionConstraint; } + + private PartitioningProperties getSecondaryIndexBulkloadPartitioningProperties(MetadataProvider mp, Dataset dataset, + String indexName) throws AlgebricksException { + PartitioningProperties partitioningProperties = mp.getPartitioningProperties(dataset, indexName); + // special case for bulkloading secondary indexes for datasets with correldated merge policy + // to ensure correctness, we will run in as many locations as storage partitions + // this will not be needed once ASTERIXDB-3176 is implemented + if (this instanceof SecondaryCorrelatedTreeIndexOperationsHelper) { + FileSplit[] fileSplits = partitioningProperties.getSpiltsProvider().getFileSplits(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sp = + StoragePathUtil.splitProviderAndPartitionConstraints(fileSplits); + return PartitioningProperties.of(sp.getFirst(), sp.getSecond(), + DataPartitioningProvider.getOneToOnePartitionsMap(fileSplits.length)); + } + return partitioningProperties; + } }