This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 9acfce145e [ASTERIXDB-3144][HYR][RT] Make dump_index() function support multiple partitions 9acfce145e is described below commit 9acfce145e846edcf719a009b8b925d027354604 Author: Ali Alsuliman <ali.al.solai...@gmail.com> AuthorDate: Fri May 5 17:21:15 2023 -0700 [ASTERIXDB-3144][HYR][RT] Make dump_index() function support multiple partitions - user model changes: no - storage format changes: no - interface changes: no Details: This patch changes the dump_index() function to support operating on multiple partitions. Change-Id: I8754069a7340c0d9e3bf69e1fe5c94eb333b73b5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17513 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- .../asterix/app/function/DumpIndexDatasource.java | 13 +++-- .../asterix/app/function/DumpIndexFunction.java | 14 +++-- .../asterix/app/function/DumpIndexReader.java | 62 ++++++++++++++-------- .../asterix/app/function/DumpIndexRewriter.java | 9 ++-- .../LSMSecondaryIndexBulkLoadNodePushable.java | 2 +- 5 files changed, 65 insertions(+), 35 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java index 691be4706c..bd72a66d66 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java @@ -39,27 +39,30 @@ public class DumpIndexDatasource extends FunctionDataSource { private final IndexDataflowHelperFactory indexDataflowHelperFactory; private final RecordDescriptor recDesc; private final IBinaryComparatorFactory[] comparatorFactories; - private final AlgebricksAbsolutePartitionConstraint storageLocations; + private final AlgebricksAbsolutePartitionConstraint constraint; + private final int[][] partitionsMap; public DumpIndexDatasource(INodeDomain domain, IndexDataflowHelperFactory indexDataflowHelperFactory, RecordDescriptor recDesc, IBinaryComparatorFactory[] comparatorFactories, - AlgebricksAbsolutePartitionConstraint storageLocations) throws AlgebricksException { + AlgebricksAbsolutePartitionConstraint constraint, int[][] partitionsMap) throws AlgebricksException { super(DUMP_INDEX_DATASOURCE_ID, DumpIndexRewriter.DUMP_INDEX, domain); this.indexDataflowHelperFactory = indexDataflowHelperFactory; this.recDesc = recDesc; this.comparatorFactories = comparatorFactories; - this.storageLocations = storageLocations; + this.constraint = constraint; + this.partitionsMap = partitionsMap; } @Override protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) { - return storageLocations; + return constraint; } @Override protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, AlgebricksAbsolutePartitionConstraint locations) { - return new DumpIndexFunction(locations, indexDataflowHelperFactory, recDesc, comparatorFactories); + return new DumpIndexFunction(locations, indexDataflowHelperFactory, recDesc, comparatorFactories, + partitionsMap); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java index 2fdbef3f75..fcfe3e0b96 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java @@ -31,25 +31,31 @@ import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; public class DumpIndexFunction extends AbstractDatasourceFunction { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final IndexDataflowHelperFactory indexDataflowHelperFactory; private final RecordDescriptor recDesc; private final IBinaryComparatorFactory[] comparatorFactories; + private final int[][] partitionsMap; public DumpIndexFunction(AlgebricksAbsolutePartitionConstraint locations, IndexDataflowHelperFactory indexDataflowHelperFactory, RecordDescriptor recDesc, - IBinaryComparatorFactory[] comparatorFactories) { + IBinaryComparatorFactory[] comparatorFactories, int[][] partitionsMap) { super(locations); this.indexDataflowHelperFactory = indexDataflowHelperFactory; this.recDesc = recDesc; this.comparatorFactories = comparatorFactories; + this.partitionsMap = partitionsMap; } @Override public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); - final IIndexDataflowHelper indexDataflowHelper = indexDataflowHelperFactory.create(serviceCtx, partition); - return new DumpIndexReader(indexDataflowHelper, recDesc, comparatorFactories); + int[] partitions = partitionsMap[partition]; + final IIndexDataflowHelper[] indexDataflowHelpers = new IIndexDataflowHelper[partitions.length]; + for (int i = 0; i < partitions.length; i++) { + indexDataflowHelpers[i] = indexDataflowHelperFactory.create(serviceCtx, partitions[i]); + } + return new DumpIndexReader(indexDataflowHelpers, recDesc, comparatorFactories); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java index 8ef094e923..5c5a218084 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java @@ -34,12 +34,13 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; -import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.MultiComparator; @@ -48,38 +49,50 @@ import org.apache.hyracks.util.JSONUtil; public class DumpIndexReader extends FunctionReader { private final CharArrayRecord record; - private final IIndexCursor searchCursor; + private final IIndexCursor[] searchCursors; private final RecordDescriptor secondaryRecDesc; private final StringBuilder recordBuilder = new StringBuilder(); private final ByteBufferInputStream bbis = new ByteBufferInputStream(); private final DataInputStream dis = new DataInputStream(bbis); - private final IIndexDataflowHelper indexDataflowHelper; - private final IIndexAccessor accessor; + private final IIndexDataflowHelper[] indexDataflowHelpers; + private final IIndexAccessor[] accessors; + private int currentSearchIdx; - public DumpIndexReader(IIndexDataflowHelper indexDataflowHelper, RecordDescriptor secondaryRecDesc, + public DumpIndexReader(IIndexDataflowHelper[] indexDataflowHelpers, RecordDescriptor secondaryRecDesc, IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException { - this.indexDataflowHelper = indexDataflowHelper; + this.indexDataflowHelpers = indexDataflowHelpers; this.secondaryRecDesc = secondaryRecDesc; - indexDataflowHelper.open(); - IIndex indexInstance = indexDataflowHelper.getIndexInstance(); - accessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); - searchCursor = accessor.createSearchCursor(false); MultiComparator searchMultiComparator = MultiComparator.create(comparatorFactories); RangePredicate rangePredicate = new RangePredicate(null, null, true, true, searchMultiComparator, searchMultiComparator, null, null); - accessor.search(searchCursor, rangePredicate); + this.accessors = new IIndexAccessor[indexDataflowHelpers.length]; + this.searchCursors = new IIndexCursor[indexDataflowHelpers.length]; + for (int i = 0; i < indexDataflowHelpers.length; i++) { + IIndexDataflowHelper indexDataflowHelper = indexDataflowHelpers[i]; + indexDataflowHelper.open(); + accessors[i] = indexDataflowHelper.getIndexInstance().createAccessor(NoOpIndexAccessParameters.INSTANCE); + searchCursors[i] = accessors[i].createSearchCursor(false); + accessors[i].search(searchCursors[i], rangePredicate); + } + currentSearchIdx = 0; record = new CharArrayRecord(); } @Override public boolean hasNext() throws Exception { - return searchCursor.hasNext(); + while (currentSearchIdx < searchCursors.length) { + if (searchCursors[currentSearchIdx].hasNext()) { + return true; + } + currentSearchIdx++; + } + return false; } @Override public IRawRecord<char[]> next() throws IOException, InterruptedException { - searchCursor.next(); - ITupleReference tuple = searchCursor.getTuple(); + searchCursors[currentSearchIdx].next(); + ITupleReference tuple = searchCursors[currentSearchIdx].getTuple(); buildJsonRecord(tuple); record.reset(); record.append(recordBuilder.toString().toCharArray()); @@ -89,16 +102,21 @@ public class DumpIndexReader extends FunctionReader { @Override public void close() throws IOException { - bbis.close(); - dis.close(); - if (searchCursor != null) { - searchCursor.close(); - searchCursor.destroy(); + Throwable failure = releaseResources(); + if (failure != null) { + throw HyracksDataException.create(failure); } - if (accessor != null) { - accessor.destroy(); + } + + private Throwable releaseResources() { + Throwable failure = CleanupUtils.close(bbis, null); + failure = CleanupUtils.close(dis, failure); + for (int i = 0; i < indexDataflowHelpers.length; i++) { + failure = ResourceReleaseUtils.close(searchCursors[i], failure); + failure = CleanupUtils.destroy(failure, searchCursors[i], accessors[i]); + failure = ResourceReleaseUtils.close(indexDataflowHelpers[i], failure); } - indexDataflowHelper.close(); + return failure; } private void buildJsonRecord(ITupleReference tuple) throws HyracksDataException { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java index 6c0382d52b..dac1ac7224 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.app.function; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.functions.FunctionConstants; @@ -66,13 +67,15 @@ public class DumpIndexRewriter extends FunctionRewriter { } ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, loc); + PartitioningProperties partitioningProperties = + metadataProvider.getPartitioningProperties(dataset, index.getIndexName()); IndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), - secondaryIndexHelper.getSecondaryFileSplitProvider()); + partitioningProperties.getSpiltsProvider()); AlgebricksAbsolutePartitionConstraint secondaryPartitionConstraint = - (AlgebricksAbsolutePartitionConstraint) secondaryIndexHelper.getSecondaryPartitionConstraint(); + (AlgebricksAbsolutePartitionConstraint) partitioningProperties.getConstraints(); return new DumpIndexDatasource(context.getComputationNodeDomain(), indexDataflowHelperFactory, secondaryIndexHelper.getSecondaryRecDesc(), secondaryIndexHelper.getSecondaryComparatorFactories(), - secondaryPartitionConstraint); + secondaryPartitionConstraint, partitioningProperties.getComputeStorageMap()); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java index 1b4fd23824..64dce1ea27 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -63,7 +63,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields, int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException { super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree); - + //TODO(partitioning) correlated this.primaryIndexHelper = primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.secondaryIndexHelper =