This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9acfce145e [ASTERIXDB-3144][HYR][RT] Make dump_index() function 
support multiple partitions
9acfce145e is described below

commit 9acfce145e846edcf719a009b8b925d027354604
Author: Ali Alsuliman <ali.al.solai...@gmail.com>
AuthorDate: Fri May 5 17:21:15 2023 -0700

    [ASTERIXDB-3144][HYR][RT] Make dump_index() function support multiple 
partitions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    This patch changes the dump_index() function to support
    operating on multiple partitions.
    
    Change-Id: I8754069a7340c0d9e3bf69e1fe5c94eb333b73b5
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17513
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
---
 .../asterix/app/function/DumpIndexDatasource.java  | 13 +++--
 .../asterix/app/function/DumpIndexFunction.java    | 14 +++--
 .../asterix/app/function/DumpIndexReader.java      | 62 ++++++++++++++--------
 .../asterix/app/function/DumpIndexRewriter.java    |  9 ++--
 .../LSMSecondaryIndexBulkLoadNodePushable.java     |  2 +-
 5 files changed, 65 insertions(+), 35 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index 691be4706c..bd72a66d66 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -39,27 +39,30 @@ public class DumpIndexDatasource extends FunctionDataSource 
{
     private final IndexDataflowHelperFactory indexDataflowHelperFactory;
     private final RecordDescriptor recDesc;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final AlgebricksAbsolutePartitionConstraint storageLocations;
+    private final AlgebricksAbsolutePartitionConstraint constraint;
+    private final int[][] partitionsMap;
 
     public DumpIndexDatasource(INodeDomain domain, IndexDataflowHelperFactory 
indexDataflowHelperFactory,
             RecordDescriptor recDesc, IBinaryComparatorFactory[] 
comparatorFactories,
-            AlgebricksAbsolutePartitionConstraint storageLocations) throws 
AlgebricksException {
+            AlgebricksAbsolutePartitionConstraint constraint, int[][] 
partitionsMap) throws AlgebricksException {
         super(DUMP_INDEX_DATASOURCE_ID, DumpIndexRewriter.DUMP_INDEX, domain);
         this.indexDataflowHelperFactory = indexDataflowHelperFactory;
         this.recDesc = recDesc;
         this.comparatorFactories = comparatorFactories;
-        this.storageLocations = storageLocations;
+        this.constraint = constraint;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     protected AlgebricksAbsolutePartitionConstraint 
getLocations(IClusterStateManager csm) {
-        return storageLocations;
+        return constraint;
     }
 
     @Override
     protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
             AlgebricksAbsolutePartitionConstraint locations) {
-        return new DumpIndexFunction(locations, indexDataflowHelperFactory, 
recDesc, comparatorFactories);
+        return new DumpIndexFunction(locations, indexDataflowHelperFactory, 
recDesc, comparatorFactories,
+                partitionsMap);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
index 2fdbef3f75..fcfe3e0b96 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
@@ -31,25 +31,31 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 
 public class DumpIndexFunction extends AbstractDatasourceFunction {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IndexDataflowHelperFactory indexDataflowHelperFactory;
     private final RecordDescriptor recDesc;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final int[][] partitionsMap;
 
     public DumpIndexFunction(AlgebricksAbsolutePartitionConstraint locations,
             IndexDataflowHelperFactory indexDataflowHelperFactory, 
RecordDescriptor recDesc,
-            IBinaryComparatorFactory[] comparatorFactories) {
+            IBinaryComparatorFactory[] comparatorFactories, int[][] 
partitionsMap) {
         super(locations);
         this.indexDataflowHelperFactory = indexDataflowHelperFactory;
         this.recDesc = recDesc;
         this.comparatorFactories = comparatorFactories;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition)
             throws HyracksDataException {
         INCServiceContext serviceCtx = 
ctx.getJobletContext().getServiceContext();
-        final IIndexDataflowHelper indexDataflowHelper = 
indexDataflowHelperFactory.create(serviceCtx, partition);
-        return new DumpIndexReader(indexDataflowHelper, recDesc, 
comparatorFactories);
+        int[] partitions = partitionsMap[partition];
+        final IIndexDataflowHelper[] indexDataflowHelpers = new 
IIndexDataflowHelper[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            indexDataflowHelpers[i] = 
indexDataflowHelperFactory.create(serviceCtx, partitions[i]);
+        }
+        return new DumpIndexReader(indexDataflowHelpers, recDesc, 
comparatorFactories);
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 8ef094e923..5c5a218084 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -34,12 +34,13 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.MultiComparator;
@@ -48,38 +49,50 @@ import org.apache.hyracks.util.JSONUtil;
 public class DumpIndexReader extends FunctionReader {
 
     private final CharArrayRecord record;
-    private final IIndexCursor searchCursor;
+    private final IIndexCursor[] searchCursors;
     private final RecordDescriptor secondaryRecDesc;
     private final StringBuilder recordBuilder = new StringBuilder();
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream dis = new DataInputStream(bbis);
-    private final IIndexDataflowHelper indexDataflowHelper;
-    private final IIndexAccessor accessor;
+    private final IIndexDataflowHelper[] indexDataflowHelpers;
+    private final IIndexAccessor[] accessors;
+    private int currentSearchIdx;
 
-    public DumpIndexReader(IIndexDataflowHelper indexDataflowHelper, 
RecordDescriptor secondaryRecDesc,
+    public DumpIndexReader(IIndexDataflowHelper[] indexDataflowHelpers, 
RecordDescriptor secondaryRecDesc,
             IBinaryComparatorFactory[] comparatorFactories) throws 
HyracksDataException {
-        this.indexDataflowHelper = indexDataflowHelper;
+        this.indexDataflowHelpers = indexDataflowHelpers;
         this.secondaryRecDesc = secondaryRecDesc;
-        indexDataflowHelper.open();
-        IIndex indexInstance = indexDataflowHelper.getIndexInstance();
-        accessor = 
indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        searchCursor = accessor.createSearchCursor(false);
         MultiComparator searchMultiComparator = 
MultiComparator.create(comparatorFactories);
         RangePredicate rangePredicate =
                 new RangePredicate(null, null, true, true, 
searchMultiComparator, searchMultiComparator, null, null);
-        accessor.search(searchCursor, rangePredicate);
+        this.accessors = new IIndexAccessor[indexDataflowHelpers.length];
+        this.searchCursors = new IIndexCursor[indexDataflowHelpers.length];
+        for (int i = 0; i < indexDataflowHelpers.length; i++) {
+            IIndexDataflowHelper indexDataflowHelper = indexDataflowHelpers[i];
+            indexDataflowHelper.open();
+            accessors[i] = 
indexDataflowHelper.getIndexInstance().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            searchCursors[i] = accessors[i].createSearchCursor(false);
+            accessors[i].search(searchCursors[i], rangePredicate);
+        }
+        currentSearchIdx = 0;
         record = new CharArrayRecord();
     }
 
     @Override
     public boolean hasNext() throws Exception {
-        return searchCursor.hasNext();
+        while (currentSearchIdx < searchCursors.length) {
+            if (searchCursors[currentSearchIdx].hasNext()) {
+                return true;
+            }
+            currentSearchIdx++;
+        }
+        return false;
     }
 
     @Override
     public IRawRecord<char[]> next() throws IOException, InterruptedException {
-        searchCursor.next();
-        ITupleReference tuple = searchCursor.getTuple();
+        searchCursors[currentSearchIdx].next();
+        ITupleReference tuple = searchCursors[currentSearchIdx].getTuple();
         buildJsonRecord(tuple);
         record.reset();
         record.append(recordBuilder.toString().toCharArray());
@@ -89,16 +102,21 @@ public class DumpIndexReader extends FunctionReader {
 
     @Override
     public void close() throws IOException {
-        bbis.close();
-        dis.close();
-        if (searchCursor != null) {
-            searchCursor.close();
-            searchCursor.destroy();
+        Throwable failure = releaseResources();
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
         }
-        if (accessor != null) {
-            accessor.destroy();
+    }
+
+    private Throwable releaseResources() {
+        Throwable failure = CleanupUtils.close(bbis, null);
+        failure = CleanupUtils.close(dis, failure);
+        for (int i = 0; i < indexDataflowHelpers.length; i++) {
+            failure = ResourceReleaseUtils.close(searchCursors[i], failure);
+            failure = CleanupUtils.destroy(failure, searchCursors[i], 
accessors[i]);
+            failure = ResourceReleaseUtils.close(indexDataflowHelpers[i], 
failure);
         }
-        indexDataflowHelper.close();
+        return failure;
     }
 
     private void buildJsonRecord(ITupleReference tuple) throws 
HyracksDataException {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
index 6c0382d52b..dac1ac7224 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.function;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.functions.FunctionConstants;
@@ -66,13 +67,15 @@ public class DumpIndexRewriter extends FunctionRewriter {
         }
         ISecondaryIndexOperationsHelper secondaryIndexHelper =
                 
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, 
metadataProvider, loc);
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, 
index.getIndexName());
         IndexDataflowHelperFactory indexDataflowHelperFactory =
                 new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        secondaryIndexHelper.getSecondaryFileSplitProvider());
+                        partitioningProperties.getSpiltsProvider());
         AlgebricksAbsolutePartitionConstraint secondaryPartitionConstraint =
-                (AlgebricksAbsolutePartitionConstraint) 
secondaryIndexHelper.getSecondaryPartitionConstraint();
+                (AlgebricksAbsolutePartitionConstraint) 
partitioningProperties.getConstraints();
         return new DumpIndexDatasource(context.getComputationNodeDomain(), 
indexDataflowHelperFactory,
                 secondaryIndexHelper.getSecondaryRecDesc(), 
secondaryIndexHelper.getSecondaryComparatorFactories(),
-                secondaryPartitionConstraint);
+                secondaryPartitionConstraint, 
partitioningProperties.getComputeStorageMap());
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 1b4fd23824..64dce1ea27 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -63,7 +63,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryI
             IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] 
fieldPermutation, int numTagFields,
             int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) 
throws HyracksDataException {
         super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, 
numPrimaryKeys, hasBuddyBTree);
-
+        //TODO(partitioning) correlated
         this.primaryIndexHelper =
                 
primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
         this.secondaryIndexHelper =

Reply via email to