This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new de51cc4799 [ASTERIXDB-3144][RT] Run correlated index bulkload at 
storage parallelism
de51cc4799 is described below

commit de51cc479920e26e88f041d8c5d064764f788a42
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Fri May 5 21:22:32 2023 +0300

    [ASTERIXDB-3144][RT] Run correlated index bulkload at storage parallelism
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - To ensure correctness, always run correlated secondary index
      bulkload jobs using storage parallelism regardless of the number
      of compute partitions. This will ensure that each storage partition
      will produce the corresponding secondary index components.
    
    Change-Id: I5f38e4b06bcd91479bae544619bf3a96dde3c500
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17512
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../apache/asterix/metadata/utils/DatasetUtil.java | 15 ++++++++++--
 .../SecondaryCorrelatedBTreeOperationsHelper.java  |  3 ++-
 ...aryCorrelatedInvertedIndexOperationsHelper.java |  3 ++-
 .../SecondaryCorrelatedRTreeOperationsHelper.java  |  3 ++-
 .../utils/SecondaryIndexOperationsHelper.java      | 27 ++++++++++++++++++----
 5 files changed, 42 insertions(+), 9 deletions(-)

diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 130e39c357..e882129823 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -585,7 +585,20 @@ public class DatasetUtil {
             MetadataProvider metadataProvider) throws AlgebricksException {
         PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
         AlgebricksPartitionConstraint primaryPartitionConstraint = 
partitioningProperties.getConstraints();
+        IOperatorDescriptor dummyKeyProviderOp = 
createDummyKeyProviderOp(spec);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
dummyKeyProviderOp,
+                primaryPartitionConstraint);
+        return dummyKeyProviderOp;
+    }
 
+    public static IOperatorDescriptor 
createCorrelatedDummyKeyProviderOp(JobSpecification spec,
+            AlgebricksPartitionConstraint apc) throws AlgebricksException {
+        IOperatorDescriptor dummyKeyProviderOp = 
createDummyKeyProviderOp(spec);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
dummyKeyProviderOp, apc);
+        return dummyKeyProviderOp;
+    }
+
+    private static IOperatorDescriptor 
createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
         // Build dummy tuple containing one field with a dummy value inside.
         ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
         DataOutput dos = tb.getDataOutput();
@@ -602,8 +615,6 @@ public class DatasetUtil {
         RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
         ConstantTupleSourceOperatorDescriptor keyProviderOp = new 
ConstantTupleSourceOperatorDescriptor(spec,
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
keyProviderOp,
-                primaryPartitionConstraint);
         return keyProviderOp;
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index c59613781b..2b948ef623 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -75,7 +75,8 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends 
SecondaryCorrelate
         IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
-        IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        IOperatorDescriptor keyProviderOp =
+                DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, 
primaryPartitionConstraint);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 00cc5950b3..cd3f01c85d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -216,7 +216,8 @@ public class 
SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryC
         IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
-        IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        IOperatorDescriptor keyProviderOp =
+                DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, 
primaryPartitionConstraint);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index e26aab3731..302ad74a01 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -188,7 +188,8 @@ public class SecondaryCorrelatedRTreeOperationsHelper 
extends SecondaryCorrelate
         assert dataset.getDatasetType() == DatasetType.INTERNAL;
 
         // Create dummy key provider for feeding the primary index scan.
-        IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
+        IOperatorDescriptor keyProviderOp =
+                DatasetUtil.createCorrelatedDummyKeyProviderOp(spec, 
primaryPartitionConstraint);
         IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create primary index scan op.
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5d6c13c792..1a58423246 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -30,6 +30,7 @@ import 
org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -74,6 +75,7 @@ import 
org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -211,8 +213,9 @@ public abstract class SecondaryIndexOperationsHelper 
implements ISecondaryIndexO
         payloadSerde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
         metaSerde =
                 metaType == null ? null : 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
-        PartitioningProperties partitioningProperties =
-                metadataProvider.getPartitioningProperties(dataset, 
index.getIndexName());
+        PartitioningProperties partitioningProperties;
+        partitioningProperties =
+                
getSecondaryIndexBulkloadPartitioningProperties(metadataProvider, dataset, 
index.getIndexName());
         secondaryFileSplitProvider = 
partitioningProperties.getSpiltsProvider();
         secondaryPartitionConstraint = partitioningProperties.getConstraints();
         numPrimaryKeys = dataset.getPrimaryKeys().size();
@@ -223,8 +226,8 @@ public abstract class SecondaryIndexOperationsHelper 
implements ISecondaryIndexO
             } else {
                 numFilterFields = 0;
             }
-
-            PartitioningProperties datasetPartitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+            PartitioningProperties datasetPartitioningProperties = 
getSecondaryIndexBulkloadPartitioningProperties(
+                    metadataProvider, dataset, dataset.getDatasetName());
             primaryFileSplitProvider = 
datasetPartitioningProperties.getSpiltsProvider();
             primaryPartitionConstraint = 
datasetPartitioningProperties.getConstraints();
             setPrimaryRecDescAndComparators();
@@ -527,4 +530,20 @@ public abstract class SecondaryIndexOperationsHelper 
implements ISecondaryIndexO
     public AlgebricksPartitionConstraint getSecondaryPartitionConstraint() {
         return secondaryPartitionConstraint;
     }
+
+    private PartitioningProperties 
getSecondaryIndexBulkloadPartitioningProperties(MetadataProvider mp, Dataset 
dataset,
+            String indexName) throws AlgebricksException {
+        PartitioningProperties partitioningProperties = 
mp.getPartitioningProperties(dataset, indexName);
+        // special case for bulkloading secondary indexes for datasets with 
correldated merge policy
+        // to ensure correctness, we will run in as many locations as storage 
partitions
+        // this will not be needed once ASTERIXDB-3176 is implemented
+        if (this instanceof SecondaryCorrelatedTreeIndexOperationsHelper) {
+            FileSplit[] fileSplits = 
partitioningProperties.getSpiltsProvider().getFileSplits();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sp =
+                    
StoragePathUtil.splitProviderAndPartitionConstraints(fileSplits);
+            return PartitioningProperties.of(sp.getFirst(), sp.getSecond(),
+                    
DataPartitioningProvider.getOneToOnePartitionsMap(fileSplits.length));
+        }
+        return partitioningProperties;
+    }
 }

Reply via email to