Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Fix Secondary Indexes Bulkload/Upsert ......................................................................
[NO ISSUE][RT] Fix Secondary Indexes Bulkload/Upsert - user model changes: no - storage format changes: no - interface changes: no Details: - Prevent null values from being upserted into secondary indexes. - Use secondary index tuple filter when bulkloading to prevent null/missing values from being inserted into indexes of optional fields. Change-Id: I7b847aece06a4387cc41389b0566d0c0dc98487d Reviewed-on: https://asterix-gerrit.ics.uci.edu/3304 Reviewed-by: Luo Chen <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java 12 files changed, 84 insertions(+), 37 deletions(-) Approvals: Luo Chen: Looks good to me, but someone else must approve Anon. E. Moose #1000171: Ali Alsuliman: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index b07a30e..1a9df0b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -220,7 +220,7 @@ LSMIndexBulkLoadOperatorNodePushable op = new LSMIndexBulkLoadOperatorNodePushable(secondaryIndexHelperFactory, primaryIndexHelperFactory, ctx, 0, fieldPermutation, 1.0F, false, numElementsHint, true, secondaryIndexInfo.rDesc, - BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId()); + BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null); op.setOutputFrameWriter(0, new SinkRuntimeFactory().createPushRuntime(ctx)[0], null); return Pair.of(secondaryIndexInfo, op); } catch (Throwable th) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java index 83e4566..f910fb4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; @@ -31,13 +32,16 @@ private static final long serialVersionUID = 1L; private final int version; + private final ITupleFilterFactory tupleFilterFactory; public ExternalIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, - boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory, int version) { + boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory, int version, + ITupleFilterFactory tupleFilterFactory) { super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, indexHelperFactory); this.version = version; + this.tupleFilterFactory = tupleFilterFactory; } @Override @@ -45,7 +49,7 @@ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { return new ExternalIndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, - recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version); + recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version, tupleFilterFactory); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java index 74bc0dc..39f23ed 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -37,10 +38,10 @@ public ExternalIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, - long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc, int version) - throws HyracksDataException { + long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc, int version, + ITupleFilterFactory tupleFilterFactory) throws HyracksDataException { super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, - checkIfEmptyIndex, recDesc); + checkIfEmptyIndex, recDesc, tupleFilterFactory); this.version = version; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java index 60f1721..674ee42 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; @@ -30,12 +31,15 @@ private static final long serialVersionUID = 1L; private final int[] deletedFiles; + private final ITupleFilterFactory tupleFilterFactory; public ExternalIndexBulkModifyOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexDataflowHelperFactory dataflowHelperFactory, int[] deletedFiles, int[] fieldPermutation, - float fillFactor, boolean verifyInput, long numElementsHint) { - super(spec, null, fieldPermutation, fillFactor, verifyInput, numElementsHint, false, dataflowHelperFactory); + float fillFactor, boolean verifyInput, long numElementsHint, ITupleFilterFactory tupleFilterFactory) { + super(spec, null, fieldPermutation, fillFactor, verifyInput, numElementsHint, false, dataflowHelperFactory, + tupleFilterFactory); this.deletedFiles = deletedFiles; + this.tupleFilterFactory = tupleFilterFactory; } @Override @@ -43,7 +47,7 @@ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { return new ExternalIndexBulkModifyOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, - recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), deletedFiles); + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), deletedFiles, tupleFilterFactory); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java index 57e2917..745853e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java @@ -30,6 +30,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable; @@ -45,9 +46,10 @@ public ExternalIndexBulkModifyOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, - long numElementsHint, RecordDescriptor inputRecDesc, int[] deletedFiles) throws HyracksDataException { + long numElementsHint, RecordDescriptor inputRecDesc, int[] deletedFiles, + ITupleFilterFactory tupleFilterFactory) throws HyracksDataException { super(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, false, - inputRecDesc); + inputRecDesc, tupleFilterFactory); this.deletedFiles = deletedFiles; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index e212d11..55bf261 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -642,7 +642,7 @@ new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, - indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); + indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null); return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); } @@ -1034,7 +1034,7 @@ long numElementsHint = getCardinalityPerPartitionHint(dataset); op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null, BulkLoadUsage.LOAD, - dataset.getDatasetId()); + dataset.getDatasetId(), null); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, null, true, modificationCallbackFactory); @@ -1159,7 +1159,7 @@ long numElementsHint = getCardinalityPerPartitionHint(dataset); op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null, - BulkLoadUsage.LOAD, dataset.getDatasetId()); + BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory); } else if (indexOp == IndexOperation.UPSERT) { int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, @@ -1259,7 +1259,7 @@ long numElementsHint = getCardinalityPerPartitionHint(dataset); op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataflowHelperFactory, - null, BulkLoadUsage.LOAD, dataset.getDatasetId()); + null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory); } else if (indexOp == IndexOperation.UPSERT) { int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, @@ -1370,7 +1370,7 @@ long numElementsHint = getCardinalityPerPartitionHint(dataset); op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory, - null, BulkLoadUsage.LOAD, dataset.getDatasetId()); + null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory); } else if (indexOp == IndexOperation.UPSERT) { int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 5f9e6ef..efee1db 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -361,10 +361,10 @@ throws AlgebricksException { IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory( metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); - + // when an index is being created (not loaded) the filtration is introduced in the pipeline -> no tuple filter LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, - primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId()); + primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null); treeIndexBulkLoadOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); @@ -376,7 +376,7 @@ throws AlgebricksException { ExternalIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider)); + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), null); treeIndexBulkLoadOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); @@ -494,7 +494,7 @@ } } ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor( - spec, dataflowHelperFactory, deletedFiles, fieldPermutation, fillFactor, false, numElementsHint); + spec, dataflowHelperFactory, deletedFiles, fieldPermutation, fillFactor, false, numElementsHint, null); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); return treeIndexBulkLoadOp; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java index 74590c7..ea84ca3 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; @@ -43,15 +44,19 @@ protected final int datasetId; + protected final ITupleFilterFactory tupleFilterFactory; + public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory, - IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId) { + IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId, + ITupleFilterFactory tupleFilterFactory) { super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, indexHelperFactory); this.primaryIndexHelperFactory = primaryIndexHelperFactory; this.usage = usage; this.datasetId = datasetId; + this.tupleFilterFactory = tupleFilterFactory; } @Override @@ -59,6 +64,7 @@ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, - recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId); + recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId, + tupleFilterFactory); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java index 4130490..46dfe65 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; @@ -45,16 +46,15 @@ protected final IDatasetLifecycleManager datasetManager; protected final int datasetId; protected final int partition; - protected ILSMIndex primaryIndex; public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory, IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, - boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId) - throws HyracksDataException { + boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId, + ITupleFilterFactory tupleFilterFactory) throws HyracksDataException { super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, - checkIfEmptyIndex, recDesc); + checkIfEmptyIndex, recDesc, tupleFilterFactory); if (priamryIndexDataflowHelperFactory != null) { this.primaryIndexHelper = diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java index 2dc7f5e..a2e2a0b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java @@ -59,6 +59,7 @@ */ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable { + private static final int NULL_MISSING_FIELD_INDEX = 0; private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference(); private final int upsertIndicatorFieldIndex; private final IBinaryBooleanInspector upsertIndicatorInspector; @@ -104,9 +105,9 @@ tuple.reset(accessor, i); prevValueTuple.reset(accessor, i); - boolean isNewValueMissing = isMissing(tuple, 0); - boolean isOldValueMissing = isMissing(prevValueTuple, 0); - if (isNewValueMissing && isOldValueMissing) { + boolean isNewValueNullOrMissing = isNullOrMissing(tuple); + boolean isOldValueNullOrMissing = isNullOrMissing(prevValueTuple); + if (isNewValueNullOrMissing && isOldValueNullOrMissing) { // No op continue; } @@ -117,12 +118,12 @@ // which are always the same continue; } - if (!isOldValueMissing) { + if (!isOldValueNullOrMissing) { // We need to delete previous abstractModCallback.setOp(Operation.DELETE); lsmAccessor.forceDelete(prevValueTuple); } - if (isUpsert && !isNewValueMissing) { + if (isUpsert && !isNewValueNullOrMissing) { // we need to insert the new value abstractModCallback.setOp(Operation.INSERT); lsmAccessor.forceInsert(tuple); @@ -137,7 +138,8 @@ FrameUtils.flushFrame(writeBuffer.getBuffer(), writer); } - private boolean isMissing(PermutingFrameTupleReference tuple, int fieldIdx) { - return TypeTagUtil.isType(tuple, fieldIdx, ATypeTag.SERIALIZED_MISSING_TYPE_TAG); + private static boolean isNullOrMissing(PermutingFrameTupleReference tuple) { + return TypeTagUtil.isType(tuple, NULL_MISSING_FIELD_INDEX, ATypeTag.SERIALIZED_NULL_TYPE_TAG) + || TypeTagUtil.isType(tuple, NULL_MISSING_FIELD_INDEX, ATypeTag.SERIALIZED_MISSING_TYPE_TAG); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java index 095159b..c2a058c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java @@ -25,11 +25,15 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.api.ITupleFilter; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.util.annotations.CriticalPath; public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable { protected final IHyracksTaskContext ctx; @@ -40,13 +44,17 @@ protected final IIndexDataflowHelper indexHelper; protected final RecordDescriptor recDesc; protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference(); + protected final ITupleFilterFactory tupleFilterFactory; protected FrameTupleAccessor accessor; protected IIndex index; protected IIndexBulkLoader bulkLoader; + protected ITupleFilter tupleFilter; + protected FrameTupleReference frameTuple; public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, - long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc) throws HyracksDataException { + long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc, + ITupleFilterFactory tupleFilterFactory) throws HyracksDataException { this.ctx = ctx; this.indexHelper = indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); this.fillFactor = fillFactor; @@ -54,8 +62,8 @@ this.numElementsHint = numElementsHint; this.checkIfEmptyIndex = checkIfEmptyIndex; this.recDesc = recDesc; + this.tupleFilterFactory = tupleFilterFactory; tuple.setFieldPermutation(fieldPermutation); - } @Override @@ -65,6 +73,10 @@ index = indexHelper.getIndexInstance(); try { writer.open(); + if (tupleFilterFactory != null) { + tupleFilter = tupleFilterFactory.createTupleFilter(ctx); + frameTuple = new FrameTupleReference(); + } initializeBulkLoader(); } catch (Exception e) { throw HyracksDataException.create(e); @@ -75,8 +87,13 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); - for (int i = 0; i < tupleCount; i++) { + if (tupleFilter != null) { + frameTuple.reset(accessor, i); + if (!tupleFilter.accept(frameTuple)) { + continue; + } + } tuple.reset(accessor, i); bulkLoader.add(tuple); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java index fad344d..8346f62 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; public class TreeIndexBulkLoadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { @@ -37,10 +38,19 @@ protected final long numElementsHint; protected final boolean checkIfEmptyIndex; protected final IIndexDataflowHelperFactory indexHelperFactory; + private final ITupleFilterFactory tupleFilterFactory; public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory) { + this(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, + indexHelperFactory, null); + } + + public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, + int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory, + ITupleFilterFactory tupleFilterFactory) { super(spec, 1, 1); this.indexHelperFactory = indexHelperFactory; this.fieldPermutation = fieldPermutation; @@ -49,6 +59,7 @@ this.numElementsHint = numElementsHint; this.checkIfEmptyIndex = checkIfEmptyIndex; this.outRecDescs[0] = outRecDesc; + this.tupleFilterFactory = tupleFilterFactory; } @Override @@ -56,6 +67,6 @@ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { return new IndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, - recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0)); + recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), tupleFilterFactory); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3304 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7b847aece06a4387cc41389b0566d0c0dc98487d Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
