Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3304

Change subject: [NO ISSUE][RT] Fix Secondary Indexes Bulkload/Upsert
......................................................................

[NO ISSUE][RT] Fix Secondary Indexes Bulkload/Upsert

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Prevent null values from being upserted into
  secondary indexes.
- Use secondary index tuple filter when bulkloading
  to prevent null/missing values from being inserted
  into indexes of optional fields.

Change-Id: I7b847aece06a4387cc41389b0566d0c0dc98487d
---
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
12 files changed, 87 insertions(+), 32 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/04/3304/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index b07a30e..1a9df0b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -220,7 +220,7 @@
             LSMIndexBulkLoadOperatorNodePushable op =
                     new 
LSMIndexBulkLoadOperatorNodePushable(secondaryIndexHelperFactory, 
primaryIndexHelperFactory,
                             ctx, 0, fieldPermutation, 1.0F, false, 
numElementsHint, true, secondaryIndexInfo.rDesc,
-                            BulkLoadUsage.CREATE_INDEX, 
dataset.getDatasetId());
+                            BulkLoadUsage.CREATE_INDEX, 
dataset.getDatasetId(), null);
             op.setOutputFrameWriter(0, new 
SinkRuntimeFactory().createPushRuntime(ctx)[0], null);
             return Pair.of(secondaryIndexInfo, op);
         } catch (Throwable th) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
index 83e4566..f910fb4 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorDescriptor.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 
@@ -31,13 +32,16 @@
 
     private static final long serialVersionUID = 1L;
     private final int version;
+    private final ITupleFilterFactory tupleFilterFactory;
 
     public ExternalIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, 
long numElementsHint,
-            boolean checkIfEmptyIndex, IIndexDataflowHelperFactory 
indexHelperFactory, int version) {
+            boolean checkIfEmptyIndex, IIndexDataflowHelperFactory 
indexHelperFactory, int version,
+            ITupleFilterFactory tupleFilterFactory) {
         super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex,
                 indexHelperFactory);
         this.version = version;
+        this.tupleFilterFactory = tupleFilterFactory;
     }
 
     @Override
@@ -45,7 +49,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         return new 
ExternalIndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition, 
fieldPermutation,
                 fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version);
+                
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), version, 
tupleFilterFactory);
     }
 
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
index 74bc0dc..e321a94 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -34,14 +35,16 @@
 public class ExternalIndexBulkLoadOperatorNodePushable extends 
IndexBulkLoadOperatorNodePushable {
 
     private final int version;
+    private final ITupleFilterFactory tupleFilterFactory;
 
     public 
ExternalIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory 
indexDataflowHelperFactory,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation, 
float fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor 
recDesc, int version)
-            throws HyracksDataException {
+            long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor 
recDesc, int version,
+            ITupleFilterFactory tupleFilterFactory) throws 
HyracksDataException {
         super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, 
fillFactor, verifyInput, numElementsHint,
-                checkIfEmptyIndex, recDesc);
+                checkIfEmptyIndex, recDesc, tupleFilterFactory);
         this.version = version;
+        this.tupleFilterFactory = tupleFilterFactory;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
index 60f1721..674ee42 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 
@@ -30,12 +31,15 @@
 
     private static final long serialVersionUID = 1L;
     private final int[] deletedFiles;
+    private final ITupleFilterFactory tupleFilterFactory;
 
     public 
ExternalIndexBulkModifyOperatorDescriptor(IOperatorDescriptorRegistry spec,
             IIndexDataflowHelperFactory dataflowHelperFactory, int[] 
deletedFiles, int[] fieldPermutation,
-            float fillFactor, boolean verifyInput, long numElementsHint) {
-        super(spec, null, fieldPermutation, fillFactor, verifyInput, 
numElementsHint, false, dataflowHelperFactory);
+            float fillFactor, boolean verifyInput, long numElementsHint, 
ITupleFilterFactory tupleFilterFactory) {
+        super(spec, null, fieldPermutation, fillFactor, verifyInput, 
numElementsHint, false, dataflowHelperFactory,
+                tupleFilterFactory);
         this.deletedFiles = deletedFiles;
+        this.tupleFilterFactory = tupleFilterFactory;
     }
 
     @Override
@@ -43,7 +47,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         return new 
ExternalIndexBulkModifyOperatorNodePushable(indexHelperFactory, ctx, partition, 
fieldPermutation,
                 fillFactor, verifyInput, numElementsHint,
-                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), deletedFiles);
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), deletedFiles, tupleFilterFactory);
     }
 
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 57e2917..745853e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
@@ -45,9 +46,10 @@
 
     public 
ExternalIndexBulkModifyOperatorNodePushable(IIndexDataflowHelperFactory 
indexHelperFactory,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation, 
float fillFactor, boolean verifyInput,
-            long numElementsHint, RecordDescriptor inputRecDesc, int[] 
deletedFiles) throws HyracksDataException {
+            long numElementsHint, RecordDescriptor inputRecDesc, int[] 
deletedFiles,
+            ITupleFilterFactory tupleFilterFactory) throws 
HyracksDataException {
         super(indexHelperFactory, ctx, partition, fieldPermutation, 
fillFactor, verifyInput, numElementsHint, false,
-                inputRecDesc);
+                inputRecDesc, tupleFilterFactory);
         this.deletedFiles = deletedFiles;
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index e212d11..55bf261 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -642,7 +642,7 @@
                 new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
LSMIndexBulkLoadOperatorDescriptor(spec, null,
                 fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, 
false, numElementsHint, true,
-                indexHelperFactory, null, BulkLoadUsage.LOAD, 
dataset.getDatasetId());
+                indexHelperFactory, null, BulkLoadUsage.LOAD, 
dataset.getDatasetId(), null);
         return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
@@ -1034,7 +1034,7 @@
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
             op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, 
fieldPermutation,
                     GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh, null, BulkLoadUsage.LOAD,
-                    dataset.getDatasetId());
+                    dataset.getDatasetId(), null);
         } else {
             op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh, null,
                     true, modificationCallbackFactory);
@@ -1159,7 +1159,7 @@
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
                 op = new LSMIndexBulkLoadOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh, null,
-                        BulkLoadUsage.LOAD, dataset.getDatasetId());
+                        BulkLoadUsage.LOAD, dataset.getDatasetId(), 
filterFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
                 int upsertIndicatorFieldIndex = 
propagatedSchema.findVariable(upsertIndicatorVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh,
@@ -1259,7 +1259,7 @@
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
             op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
                     GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, indexDataflowHelperFactory,
-                    null, BulkLoadUsage.LOAD, dataset.getDatasetId());
+                    null, BulkLoadUsage.LOAD, dataset.getDatasetId(), 
filterFactory);
         } else if (indexOp == IndexOperation.UPSERT) {
             int upsertIndicatorFieldIndex = 
propagatedSchema.findVariable(upsertIndicatorVar);
             op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
@@ -1370,7 +1370,7 @@
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
                 op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, indexDataFlowFactory,
-                        null, BulkLoadUsage.LOAD, dataset.getDatasetId());
+                        null, BulkLoadUsage.LOAD, dataset.getDatasetId(), 
filterFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
                 int upsertIndicatorFieldIndex = 
propagatedSchema.findVariable(upsertIndicatorVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexDataFlowFactory,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5f9e6ef..efee1db 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -361,10 +361,10 @@
             throws AlgebricksException {
         IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
-
+        // when an index is being created (not loaded) the filtration is 
introduced in the pipeline -> no tuple filter
         LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
LSMIndexBulkLoadOperatorDescriptor(spec,
                 secondaryRecDesc, fieldPermutation, fillFactor, false, 
numElementsHint, false, dataflowHelperFactory,
-                primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, 
dataset.getDatasetId());
+                primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, 
dataset.getDatasetId(), null);
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
@@ -376,7 +376,7 @@
             throws AlgebricksException {
         ExternalIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
ExternalIndexBulkLoadOperatorDescriptor(spec,
                 secondaryRecDesc, fieldPermutation, fillFactor, false, 
numElementsHint, false, dataflowHelperFactory,
-                
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, 
metadataProvider));
+                
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, 
metadataProvider), null);
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
@@ -494,7 +494,7 @@
             }
         }
         ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new 
ExternalIndexBulkModifyOperatorDescriptor(
-                spec, dataflowHelperFactory, deletedFiles, fieldPermutation, 
fillFactor, false, numElementsHint);
+                spec, dataflowHelperFactory, deletedFiles, fieldPermutation, 
fillFactor, false, numElementsHint, null);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
index 74590c7..ea84ca3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 
@@ -43,15 +44,19 @@
 
     protected final int datasetId;
 
+    protected final ITupleFilterFactory tupleFilterFactory;
+
     public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, 
long numElementsHint,
             boolean checkIfEmptyIndex, IIndexDataflowHelperFactory 
indexHelperFactory,
-            IIndexDataflowHelperFactory primaryIndexHelperFactory, 
BulkLoadUsage usage, int datasetId) {
+            IIndexDataflowHelperFactory primaryIndexHelperFactory, 
BulkLoadUsage usage, int datasetId,
+            ITupleFilterFactory tupleFilterFactory) {
         super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex,
                 indexHelperFactory);
         this.primaryIndexHelperFactory = primaryIndexHelperFactory;
         this.usage = usage;
         this.datasetId = datasetId;
+        this.tupleFilterFactory = tupleFilterFactory;
     }
 
     @Override
@@ -59,6 +64,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, 
primaryIndexHelperFactory, ctx, partition,
                 fieldPermutation, fillFactor, verifyInput, numElementsHint, 
checkIfEmptyIndex,
-                
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, 
datasetId);
+                
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, 
datasetId,
+                tupleFilterFactory);
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 4130490..46dfe65 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -45,16 +46,15 @@
     protected final IDatasetLifecycleManager datasetManager;
     protected final int datasetId;
     protected final int partition;
-
     protected ILSMIndex primaryIndex;
 
     public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory 
indexDataflowHelperFactory,
             IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, 
IHyracksTaskContext ctx, int partition,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, 
long numElementsHint,
-            boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage 
usage, int datasetId)
-            throws HyracksDataException {
+            boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage 
usage, int datasetId,
+            ITupleFilterFactory tupleFilterFactory) throws 
HyracksDataException {
         super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, 
fillFactor, verifyInput, numElementsHint,
-                checkIfEmptyIndex, recDesc);
+                checkIfEmptyIndex, recDesc, tupleFilterFactory);
 
         if (priamryIndexDataflowHelperFactory != null) {
             this.primaryIndexHelper =
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 2dc7f5e..86f1192 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.utils.FrameDebugUtils;
 import org.apache.hyracks.dataflow.common.utils.TupleUtils;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -93,6 +94,7 @@
         accessor.reset(buffer);
         ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
+        FrameDebugUtils.prettyPrint(accessor, inputRecDesc);
         for (int i = 0; i < tupleCount; i++) {
             try {
                 frameTuple.reset(accessor, i);
@@ -105,7 +107,9 @@
                 prevValueTuple.reset(accessor, i);
 
                 boolean isNewValueMissing = isMissing(tuple, 0);
+                boolean isNewValueNull = isNull(tuple, 0);
                 boolean isOldValueMissing = isMissing(prevValueTuple, 0);
+                boolean isOldValueNull = isNull(prevValueTuple, 0);
                 if (isNewValueMissing && isOldValueMissing) {
                     // No op
                     continue;
@@ -117,12 +121,12 @@
                     // which are always the same
                     continue;
                 }
-                if (!isOldValueMissing) {
+                if (!isOldValueMissing && !isOldValueNull) {
                     // We need to delete previous
                     abstractModCallback.setOp(Operation.DELETE);
                     lsmAccessor.forceDelete(prevValueTuple);
                 }
-                if (isUpsert && !isNewValueMissing) {
+                if (isUpsert && !isNewValueMissing && !isNewValueNull) {
                     // we need to insert the new value
                     abstractModCallback.setOp(Operation.INSERT);
                     lsmAccessor.forceInsert(tuple);
@@ -140,4 +144,8 @@
     private boolean isMissing(PermutingFrameTupleReference tuple, int 
fieldIdx) {
         return TypeTagUtil.isType(tuple, fieldIdx, 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
     }
+
+    private boolean isNull(PermutingFrameTupleReference tuple, int fieldIdx) {
+        return TypeTagUtil.isType(tuple, fieldIdx, 
ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 095159b..c2a058c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -25,11 +25,15 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.util.annotations.CriticalPath;
 
 public class IndexBulkLoadOperatorNodePushable extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final IHyracksTaskContext ctx;
@@ -40,13 +44,17 @@
     protected final IIndexDataflowHelper indexHelper;
     protected final RecordDescriptor recDesc;
     protected final PermutingFrameTupleReference tuple = new 
PermutingFrameTupleReference();
+    protected final ITupleFilterFactory tupleFilterFactory;
     protected FrameTupleAccessor accessor;
     protected IIndex index;
     protected IIndexBulkLoader bulkLoader;
+    protected ITupleFilter tupleFilter;
+    protected FrameTupleReference frameTuple;
 
     public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory 
indexDataflowHelperFactory,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation, 
float fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor 
recDesc) throws HyracksDataException {
+            long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor 
recDesc,
+            ITupleFilterFactory tupleFilterFactory) throws 
HyracksDataException {
         this.ctx = ctx;
         this.indexHelper = 
indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
         this.fillFactor = fillFactor;
@@ -54,8 +62,8 @@
         this.numElementsHint = numElementsHint;
         this.checkIfEmptyIndex = checkIfEmptyIndex;
         this.recDesc = recDesc;
+        this.tupleFilterFactory = tupleFilterFactory;
         tuple.setFieldPermutation(fieldPermutation);
-
     }
 
     @Override
@@ -65,6 +73,10 @@
         index = indexHelper.getIndexInstance();
         try {
             writer.open();
+            if (tupleFilterFactory != null) {
+                tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
+                frameTuple = new FrameTupleReference();
+            }
             initializeBulkLoader();
         } catch (Exception e) {
             throw HyracksDataException.create(e);
@@ -75,8 +87,13 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
-
         for (int i = 0; i < tupleCount; i++) {
+            if (tupleFilter != null) {
+                frameTuple.reset(accessor, i);
+                if (!tupleFilter.accept(frameTuple)) {
+                    continue;
+                }
+            }
             tuple.reset(accessor, i);
             bulkLoader.add(tuple);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index fad344d..8346f62 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 public class TreeIndexBulkLoadOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
 
@@ -37,10 +38,19 @@
     protected final long numElementsHint;
     protected final boolean checkIfEmptyIndex;
     protected final IIndexDataflowHelperFactory indexHelperFactory;
+    private final ITupleFilterFactory tupleFilterFactory;
 
     public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, 
long numElementsHint,
             boolean checkIfEmptyIndex, IIndexDataflowHelperFactory 
indexHelperFactory) {
+        this(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, 
numElementsHint, checkIfEmptyIndex,
+                indexHelperFactory, null);
+    }
+
+    public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, 
long numElementsHint,
+            boolean checkIfEmptyIndex, IIndexDataflowHelperFactory 
indexHelperFactory,
+            ITupleFilterFactory tupleFilterFactory) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.fieldPermutation = fieldPermutation;
@@ -49,6 +59,7 @@
         this.numElementsHint = numElementsHint;
         this.checkIfEmptyIndex = checkIfEmptyIndex;
         this.outRecDescs[0] = outRecDesc;
+        this.tupleFilterFactory = tupleFilterFactory;
     }
 
     @Override
@@ -56,6 +67,6 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         return new IndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, 
partition, fieldPermutation, fillFactor,
                 verifyInput, numElementsHint, checkIfEmptyIndex,
-                
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0));
+                
recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), 
tupleFilterFactory);
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3304
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7b847aece06a4387cc41389b0566d0c0dc98487d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to