Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3304
Change subject: [NO ISSUE][RT] Fix Secondary Indexes Bulkload/Upsert
......................................................................
[NO ISSUE][RT] Fix Secondary Indexes Bulkload/Upsert
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Prevent null values from being upserted into
secondary indexes.
- Use secondary index tuple filter when bulkloading
to prevent null/missing values from being inserted
into indexes of optional fields.
Change-Id: I7b847aece06a4387cc41389b0566d0c0dc98487d
---
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
12 files changed, 87 insertions(+), 32 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/04/3304/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index b07a30e..1a9df0b 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -220,7 +220,7 @@
LSMIndexBulkLoadOperatorNodePushable op =
new
LSMIndexBulkLoadOperatorNodePushable(secondaryIndexHelperFactory,
primaryIndexHelperFactory,
ctx, 0, fieldPermutation, 1.0F, false,
numElementsHint, true, secondaryIndexInfo.rDesc,
- BulkLoadUsage.CREATE_INDEX,
dataset.getDatasetId());
+ BulkLoadUsage.CREATE_INDEX,
dataset.getDatasetId(), null);
op.setOutputFrameWriter(0, new
SinkRuntimeFactory().createPushRuntime(ctx)[0], null);
return Pair.of(secondaryIndexInfo, op);
} catch (Throwable th) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
index 83e4566..f910fb4 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
@@ -31,13 +32,16 @@
private static final long serialVersionUID = 1L;
private final int version;
+ private final ITupleFilterFactory tupleFilterFactory;
public ExternalIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, float fillFactor, boolean verifyInput,
long numElementsHint,
- boolean checkIfEmptyIndex, IIndexDataflowHelperFactory
indexHelperFactory, int version) {
+ boolean checkIfEmptyIndex, IIndexDataflowHelperFactory
indexHelperFactory, int version,
+ ITupleFilterFactory tupleFilterFactory) {
super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex,
indexHelperFactory);
this.version = version;
+ this.tupleFilterFactory = tupleFilterFactory;
}
@Override
@@ -45,7 +49,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
return new
ExternalIndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition,
fieldPermutation,
fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version);
+
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version,
tupleFilterFactory);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
index 74bc0dc..e321a94 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -34,14 +35,16 @@
public class ExternalIndexBulkLoadOperatorNodePushable extends
IndexBulkLoadOperatorNodePushable {
private final int version;
+ private final ITupleFilterFactory tupleFilterFactory;
public
ExternalIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory
indexDataflowHelperFactory,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
float fillFactor, boolean verifyInput,
- long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor
recDesc, int version)
- throws HyracksDataException {
+ long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor
recDesc, int version,
+ ITupleFilterFactory tupleFilterFactory) throws
HyracksDataException {
super(indexDataflowHelperFactory, ctx, partition, fieldPermutation,
fillFactor, verifyInput, numElementsHint,
- checkIfEmptyIndex, recDesc);
+ checkIfEmptyIndex, recDesc, tupleFilterFactory);
this.version = version;
+ this.tupleFilterFactory = tupleFilterFactory;
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
index 60f1721..674ee42 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
@@ -30,12 +31,15 @@
private static final long serialVersionUID = 1L;
private final int[] deletedFiles;
+ private final ITupleFilterFactory tupleFilterFactory;
public
ExternalIndexBulkModifyOperatorDescriptor(IOperatorDescriptorRegistry spec,
IIndexDataflowHelperFactory dataflowHelperFactory, int[]
deletedFiles, int[] fieldPermutation,
- float fillFactor, boolean verifyInput, long numElementsHint) {
- super(spec, null, fieldPermutation, fillFactor, verifyInput,
numElementsHint, false, dataflowHelperFactory);
+ float fillFactor, boolean verifyInput, long numElementsHint,
ITupleFilterFactory tupleFilterFactory) {
+ super(spec, null, fieldPermutation, fillFactor, verifyInput,
numElementsHint, false, dataflowHelperFactory,
+ tupleFilterFactory);
this.deletedFiles = deletedFiles;
+ this.tupleFilterFactory = tupleFilterFactory;
}
@Override
@@ -43,7 +47,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
return new
ExternalIndexBulkModifyOperatorNodePushable(indexHelperFactory, ctx, partition,
fieldPermutation,
fillFactor, verifyInput, numElementsHint,
- recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), deletedFiles);
+ recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), deletedFiles, tupleFilterFactory);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 57e2917..745853e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
@@ -45,9 +46,10 @@
public
ExternalIndexBulkModifyOperatorNodePushable(IIndexDataflowHelperFactory
indexHelperFactory,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
float fillFactor, boolean verifyInput,
- long numElementsHint, RecordDescriptor inputRecDesc, int[]
deletedFiles) throws HyracksDataException {
+ long numElementsHint, RecordDescriptor inputRecDesc, int[]
deletedFiles,
+ ITupleFilterFactory tupleFilterFactory) throws
HyracksDataException {
super(indexHelperFactory, ctx, partition, fieldPermutation,
fillFactor, verifyInput, numElementsHint, false,
- inputRecDesc);
+ inputRecDesc, tupleFilterFactory);
this.deletedFiles = deletedFiles;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index e212d11..55bf261 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -642,7 +642,7 @@
new
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
splitsAndConstraint.first);
LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new
LSMIndexBulkLoadOperatorDescriptor(spec, null,
fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
false, numElementsHint, true,
- indexHelperFactory, null, BulkLoadUsage.LOAD,
dataset.getDatasetId());
+ indexHelperFactory, null, BulkLoadUsage.LOAD,
dataset.getDatasetId(), null);
return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
}
@@ -1034,7 +1034,7 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc,
fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true,
numElementsHint, true, idfh, null, BulkLoadUsage.LOAD,
- dataset.getDatasetId());
+ dataset.getDatasetId(), null);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, indexOp, idfh, null,
true, modificationCallbackFactory);
@@ -1159,7 +1159,7 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false, idfh, null,
- BulkLoadUsage.LOAD, dataset.getDatasetId());
+ BulkLoadUsage.LOAD, dataset.getDatasetId(),
filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, idfh,
@@ -1259,7 +1259,7 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc,
fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false, indexDataflowHelperFactory,
- null, BulkLoadUsage.LOAD, dataset.getDatasetId());
+ null, BulkLoadUsage.LOAD, dataset.getDatasetId(),
filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc,
fieldPermutation,
@@ -1370,7 +1370,7 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc,
fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
numElementsHint, false, indexDataFlowFactory,
- null, BulkLoadUsage.LOAD, dataset.getDatasetId());
+ null, BulkLoadUsage.LOAD, dataset.getDatasetId(),
filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
int upsertIndicatorFieldIndex =
propagatedSchema.findVariable(upsertIndicatorVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec,
recordDesc, fieldPermutation, indexDataFlowFactory,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5f9e6ef..efee1db 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -361,10 +361,10 @@
throws AlgebricksException {
IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new
IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(),
primaryFileSplitProvider);
-
+ // when an index is being created (not loaded) the filtration is
introduced in the pipeline -> no tuple filter
LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new
LSMIndexBulkLoadOperatorDescriptor(spec,
secondaryRecDesc, fieldPermutation, fillFactor, false,
numElementsHint, false, dataflowHelperFactory,
- primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX,
dataset.getDatasetId());
+ primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX,
dataset.getDatasetId(), null);
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
treeIndexBulkLoadOp,
secondaryPartitionConstraint);
@@ -376,7 +376,7 @@
throws AlgebricksException {
ExternalIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new
ExternalIndexBulkLoadOperatorDescriptor(spec,
secondaryRecDesc, fieldPermutation, fillFactor, false,
numElementsHint, false, dataflowHelperFactory,
-
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset,
metadataProvider));
+
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset,
metadataProvider), null);
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
treeIndexBulkLoadOp,
secondaryPartitionConstraint);
@@ -494,7 +494,7 @@
}
}
ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new
ExternalIndexBulkModifyOperatorDescriptor(
- spec, dataflowHelperFactory, deletedFiles, fieldPermutation,
fillFactor, false, numElementsHint);
+ spec, dataflowHelperFactory, deletedFiles, fieldPermutation,
fillFactor, false, numElementsHint, null);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
index 74590c7..ea84ca3 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
@@ -43,15 +44,19 @@
protected final int datasetId;
+ protected final ITupleFilterFactory tupleFilterFactory;
+
public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, float fillFactor, boolean verifyInput,
long numElementsHint,
boolean checkIfEmptyIndex, IIndexDataflowHelperFactory
indexHelperFactory,
- IIndexDataflowHelperFactory primaryIndexHelperFactory,
BulkLoadUsage usage, int datasetId) {
+ IIndexDataflowHelperFactory primaryIndexHelperFactory,
BulkLoadUsage usage, int datasetId,
+ ITupleFilterFactory tupleFilterFactory) {
super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex,
indexHelperFactory);
this.primaryIndexHelperFactory = primaryIndexHelperFactory;
this.usage = usage;
this.datasetId = datasetId;
+ this.tupleFilterFactory = tupleFilterFactory;
}
@Override
@@ -59,6 +64,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory,
primaryIndexHelperFactory, ctx, partition,
fieldPermutation, fillFactor, verifyInput, numElementsHint,
checkIfEmptyIndex,
-
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage,
datasetId);
+
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage,
datasetId,
+ tupleFilterFactory);
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 4130490..46dfe65 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -45,16 +46,15 @@
protected final IDatasetLifecycleManager datasetManager;
protected final int datasetId;
protected final int partition;
-
protected ILSMIndex primaryIndex;
public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory
indexDataflowHelperFactory,
IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory,
IHyracksTaskContext ctx, int partition,
int[] fieldPermutation, float fillFactor, boolean verifyInput,
long numElementsHint,
- boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage
usage, int datasetId)
- throws HyracksDataException {
+ boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage
usage, int datasetId,
+ ITupleFilterFactory tupleFilterFactory) throws
HyracksDataException {
super(indexDataflowHelperFactory, ctx, partition, fieldPermutation,
fillFactor, verifyInput, numElementsHint,
- checkIfEmptyIndex, recDesc);
+ checkIfEmptyIndex, recDesc, tupleFilterFactory);
if (priamryIndexDataflowHelperFactory != null) {
this.primaryIndexHelper =
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 2dc7f5e..86f1192 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.utils.FrameDebugUtils;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -93,6 +94,7 @@
accessor.reset(buffer);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
+ FrameDebugUtils.prettyPrint(accessor, inputRecDesc);
for (int i = 0; i < tupleCount; i++) {
try {
frameTuple.reset(accessor, i);
@@ -105,7 +107,9 @@
prevValueTuple.reset(accessor, i);
boolean isNewValueMissing = isMissing(tuple, 0);
+ boolean isNewValueNull = isNull(tuple, 0);
boolean isOldValueMissing = isMissing(prevValueTuple, 0);
+ boolean isOldValueNull = isNull(prevValueTuple, 0);
if (isNewValueMissing && isOldValueMissing) {
// No op
continue;
@@ -117,12 +121,12 @@
// which are always the same
continue;
}
- if (!isOldValueMissing) {
+ if (!isOldValueMissing && !isOldValueNull) {
// We need to delete previous
abstractModCallback.setOp(Operation.DELETE);
lsmAccessor.forceDelete(prevValueTuple);
}
- if (isUpsert && !isNewValueMissing) {
+ if (isUpsert && !isNewValueMissing && !isNewValueNull) {
// we need to insert the new value
abstractModCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
@@ -140,4 +144,8 @@
private boolean isMissing(PermutingFrameTupleReference tuple, int
fieldIdx) {
return TypeTagUtil.isType(tuple, fieldIdx,
ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
}
+
+ private boolean isNull(PermutingFrameTupleReference tuple, int fieldIdx) {
+ return TypeTagUtil.isType(tuple, fieldIdx,
ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 095159b..c2a058c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -25,11 +25,15 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.util.annotations.CriticalPath;
public class IndexBulkLoadOperatorNodePushable extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
protected final IHyracksTaskContext ctx;
@@ -40,13 +44,17 @@
protected final IIndexDataflowHelper indexHelper;
protected final RecordDescriptor recDesc;
protected final PermutingFrameTupleReference tuple = new
PermutingFrameTupleReference();
+ protected final ITupleFilterFactory tupleFilterFactory;
protected FrameTupleAccessor accessor;
protected IIndex index;
protected IIndexBulkLoader bulkLoader;
+ protected ITupleFilter tupleFilter;
+ protected FrameTupleReference frameTuple;
public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory
indexDataflowHelperFactory,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
float fillFactor, boolean verifyInput,
- long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor
recDesc) throws HyracksDataException {
+ long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor
recDesc,
+ ITupleFilterFactory tupleFilterFactory) throws
HyracksDataException {
this.ctx = ctx;
this.indexHelper =
indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(),
partition);
this.fillFactor = fillFactor;
@@ -54,8 +62,8 @@
this.numElementsHint = numElementsHint;
this.checkIfEmptyIndex = checkIfEmptyIndex;
this.recDesc = recDesc;
+ this.tupleFilterFactory = tupleFilterFactory;
tuple.setFieldPermutation(fieldPermutation);
-
}
@Override
@@ -65,6 +73,10 @@
index = indexHelper.getIndexInstance();
try {
writer.open();
+ if (tupleFilterFactory != null) {
+ tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
+ frameTuple = new FrameTupleReference();
+ }
initializeBulkLoader();
} catch (Exception e) {
throw HyracksDataException.create(e);
@@ -75,8 +87,13 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
-
for (int i = 0; i < tupleCount; i++) {
+ if (tupleFilter != null) {
+ frameTuple.reset(accessor, i);
+ if (!tupleFilter.accept(frameTuple)) {
+ continue;
+ }
+ }
tuple.reset(accessor, i);
bulkLoader.add(tuple);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index fad344d..8346f62 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public class TreeIndexBulkLoadOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
@@ -37,10 +38,19 @@
protected final long numElementsHint;
protected final boolean checkIfEmptyIndex;
protected final IIndexDataflowHelperFactory indexHelperFactory;
+ private final ITupleFilterFactory tupleFilterFactory;
public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, float fillFactor, boolean verifyInput,
long numElementsHint,
boolean checkIfEmptyIndex, IIndexDataflowHelperFactory
indexHelperFactory) {
+ this(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex,
+ indexHelperFactory, null);
+ }
+
+ public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor outRecDesc,
+ int[] fieldPermutation, float fillFactor, boolean verifyInput,
long numElementsHint,
+ boolean checkIfEmptyIndex, IIndexDataflowHelperFactory
indexHelperFactory,
+ ITupleFilterFactory tupleFilterFactory) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.fieldPermutation = fieldPermutation;
@@ -49,6 +59,7 @@
this.numElementsHint = numElementsHint;
this.checkIfEmptyIndex = checkIfEmptyIndex;
this.outRecDescs[0] = outRecDesc;
+ this.tupleFilterFactory = tupleFilterFactory;
}
@Override
@@ -56,6 +67,6 @@
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
return new IndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx,
partition, fieldPermutation, fillFactor,
verifyInput, numElementsHint, checkIfEmptyIndex,
-
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0));
+
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0),
tupleFilterFactory);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3304
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7b847aece06a4387cc41389b0566d0c0dc98487d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Murtadha Hubail <[email protected]>