abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/621

Change subject: Support Change Feeds
......................................................................

Support Change Feeds

This change allows feeds to perform upserts and deletes
in order to perform replication of an external data source.
The change does so by performing the following:
1. The adapter produces [PK][Record]. (Record == null --> delete)
2. The insert is replaced by an upsert operator.

Change-Id: If136a03d424970132dfb09f0dda56e160d4c0078
---
M 
asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
M 
asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
M 
asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
M 
asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M 
asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/test/resources/runtimets/testsuite.xml
M 
asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
M 
asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
M 
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
M 
asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
24 files changed, 353 insertions(+), 146 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/21/621/1

diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index f8df183..b9ed260 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -90,15 +90,24 @@
                     if (insertDeleteUpsertOperator.getOperation() == 
Kind.UPSERT) {
                         //we need to add a function that checks if previous 
record was found
                         upsertVar = context.newVar();
-                        //introduce casting to enforced type
-                        AbstractFunctionCallExpression isNullFunc = new 
ScalarFunctionCallExpression(
+                        AbstractFunctionCallExpression orFunc = new 
ScalarFunctionCallExpression(
+                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
+
+                        AbstractFunctionCallExpression isNewNullFunc = new 
ScalarFunctionCallExpression(
                                 
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL));
-                        // The first argument is the record
-                        isNullFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(
+                        
isNewNullFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
+
+                        AbstractFunctionCallExpression isPrevNullFunc = new 
ScalarFunctionCallExpression(
+                                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL));
+                        // argument is the previous record
+                        isPrevNullFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(
                                 new 
VariableReferenceExpression(insertDeleteUpsertOperator.getPrevRecordVar())));
+
+                        orFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(isPrevNullFunc));
+                        orFunc.getArguments().add(new 
MutableObject<ILogicalExpression>(isNewNullFunc));
+
                         // AssignOperator puts in the cast var the casted 
record
-                        upsertFlagAssign = new AssignOperator(upsertVar,
-                                new 
MutableObject<ILogicalExpression>(isNullFunc));
+                        upsertFlagAssign = new AssignOperator(upsertVar, new 
MutableObject<ILogicalExpression>(orFunc));
                         // Connect the current top of the plan to the cast 
operator
                         upsertFlagAssign.getInputs()
                                 .add(new 
MutableObject<ILogicalOperator>(sinkOperator.getInputs().get(0).getValue()));
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 1945be3..2f6226b 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -63,7 +63,8 @@
 public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -107,8 +108,8 @@
                 String datasetName = datasetReference.second;
                 Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
                 if (dataset == null) {
-                    throw new AlgebricksException("Could not find dataset " + 
datasetName + " in dataverse "
-                            + dataverseName);
+                    throw new AlgebricksException(
+                            "Could not find dataset " + datasetName + " in 
dataverse " + dataverseName);
                 }
 
                 AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
@@ -173,8 +174,8 @@
                 scanInpList.addAll(unnest.getInputs());
                 opRef.setValue(scan);
                 addPrimaryKey(v, context);
+                scan.setKeyVars(unnest.getAdditionalVariables(), 
unnest.getAdditionalVariableTypes());
                 context.computeAndSetTypeEnvironmentForOperator(scan);
-
                 return true;
             }
 
@@ -195,9 +196,8 @@
     private AqlDataSource createFeedDataSource(AqlSourceId aqlId, String 
targetDataset, String sourceFeedName,
             String subscriptionLocation, AqlMetadataProvider metadataProvider, 
FeedPolicyEntity feedPolicy,
             String outputType, String locations) throws AlgebricksException {
-        if (!aqlId.getDataverseName().equals(
-                metadataProvider.getDefaultDataverse() == null ? null : 
metadataProvider.getDefaultDataverse()
-                        .getDataverseName())) {
+        if 
(!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() == 
null ? null
+                : metadataProvider.getDefaultDataverse().getDataverseName())) {
             return null;
         }
         IAType feedOutputType = 
metadataProvider.findType(aqlId.getDataverseName(), outputType);
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 5dc9f18..edacbf4 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -23,10 +23,13 @@
 
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement.Kind;
 import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
@@ -398,11 +401,14 @@
         private final FeedConnectionRequest request;
         private Query query;
         private final int varCounter;
+        private SubscribeFeedStatement stmt;
 
-        public CompiledSubscribeFeedStatement(FeedConnectionRequest request, 
Query query, int varCounter) {
+        public CompiledSubscribeFeedStatement(FeedConnectionRequest request, 
Query query, int varCounter,
+                SubscribeFeedStatement stmt) {
             this.request = request;
             this.query = query;
             this.varCounter = varCounter;
+            this.stmt = stmt;
         }
 
         @Override
@@ -432,6 +438,10 @@
             return Kind.SUBSCRIBE_FEED;
         }
 
+        public boolean isChangeFeed(MetadataTransactionContext mdTxnCtx) 
throws MetadataException, AlgebricksException {
+            return stmt.isChangeFeed(mdTxnCtx);
+        }
+
     }
 
     public static class CompiledDisconnectFeedStatement implements 
ICompiledDmlStatement {
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index a1a6d4b..ec9230d 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -85,6 +85,7 @@
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import 
org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -312,6 +313,7 @@
             for (List<String> keyFieldName : partitionKeys) {
                 prepareVarAndExpression(keyFieldName, resVar, vars, exprs, 
varRefsForLoading);
             }
+            AssignOperator assign = new AssignOperator(vars, exprs);
 
             List<String> additionalFilteringField = 
DatasetUtils.getFilterField(targetDatasource.getDataset());
             List<LogicalVariable> additionalFilteringVars = null;
@@ -328,11 +330,6 @@
 
                 additionalFilteringAssign = new 
AssignOperator(additionalFilteringVars,
                         additionalFilteringAssignExpressions);
-            }
-
-            AssignOperator assign = new AssignOperator(vars, exprs);
-
-            if (additionalFilteringAssign != null) {
                 additionalFilteringAssign.getInputs().add(new 
MutableObject<ILogicalOperator>(project));
                 assign.getInputs().add(new 
MutableObject<ILogicalOperator>(additionalFilteringAssign));
             } else {
@@ -389,11 +386,41 @@
                     break;
                 }
                 case SUBSCRIBE_FEED: {
-                    ILogicalOperator insertOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef,
-                            varRefsForLoading, 
InsertDeleteUpsertOperator.Kind.INSERT, false);
-                    insertOp.getInputs().add(new 
MutableObject<ILogicalOperator>(assign));
+                    // if the feed is a change feed (i.e, performs different 
operations), we need to project op variable
+                    CompiledSubscribeFeedStatement sfs = 
(CompiledSubscribeFeedStatement) stmt;
+                    InsertDeleteUpsertOperator feedModificationOp;
+                    if 
(sfs.isChangeFeed(metadataProvider.getMetadataTxnContext())) {
+                        ARecordType recordType = (ARecordType) 
targetDatasource.getItemType();
+                        // Get the UnnestOperator
+                        UnnestOperator unnest = (UnnestOperator) 
assignCollectionToSequence.getInputs().get(0)
+                                .getValue();
+                        // add key variables
+                        unnest.setAdditionalVariables(new ArrayList<>());
+                        
unnest.getAdditionalVariables().addAll(assign.getVariables());
+                        List<IAType> types = new 
ArrayList<IAType>(partitionKeys.size());
+                        recordType.getFieldTypes(partitionKeys, types);
+                        unnest.setAdditionalVariableTypes(types);
+                        
project.getVariables().addAll(unnest.getAdditionalVariables());
+                        // Create an expression for the operation variable
+                        feedModificationOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                                InsertDeleteUpsertOperator.Kind.UPSERT, false);
+                        // Create and add a new variable used for representing 
the original record
+                        feedModificationOp.setPrevRecordVar(context.newVar());
+                        feedModificationOp.setPrevRecordType(recordType);
+                        if (additionalFilteringField != null) {
+                            
feedModificationOp.setPrevFilterVar(context.newVar());
+                            feedModificationOp
+                                    
.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+                        }
+                        
feedModificationOp.getInputs().add(assign.getInputs().get(0));
+                    } else {
+                        feedModificationOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                                InsertDeleteUpsertOperator.Kind.INSERT, false);
+                        feedModificationOp.getInputs().add(new 
MutableObject<ILogicalOperator>(assign));
+                    }
+                    
feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
                     leafOperator = new SinkOperator();
-                    leafOperator.getInputs().add(new 
MutableObject<ILogicalOperator>(insertOp));
+                    leafOperator.getInputs().add(new 
MutableObject<ILogicalOperator>(feedModificationOp));
                     break;
                 }
                 default:
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
 
b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 46ea72b..0709d27 100644
--- 
a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ 
b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -2376,7 +2376,7 @@
         bfs.initialize(metadataProvider.getMetadataTxnContext());
 
         CompiledSubscribeFeedStatement csfs = new 
CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
-                bfs.getQuery(), bfs.getVarCounter());
+                bfs.getQuery(), bfs.getVarCounter(), bfs);
         
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + 
Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, 
"" + bfs.getPolicy());
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 9b03eaf..6075f67 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -28,6 +28,58 @@
         ResultOffsetPath="results"
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
+    <test-group name="upsert">
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-secondary-rtree">
+                <output-dir compare="Text">primary-secondary-rtree</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="upsert-with-self-read">
+                <output-dir compare="Text">upsert-with-self-read</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="filtered-dataset">
+                <output-dir compare="Text">filtered-dataset</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="nullable-index">
+                <output-dir compare="Text">nullable-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="nested-index">
+                <output-dir compare="Text">nested-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="open-index">
+                <output-dir compare="Text">open-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-index">
+                <output-dir compare="Text">primary-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-secondary-btree">
+                <output-dir compare="Text">primary-secondary-btree</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-secondary-inverted">
+                <output-dir 
compare="Text">primary-secondary-inverted</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="multiple-secondaries">
+                <output-dir compare="Text">multiple-secondaries</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group>
     <test-group name="feeds">
         <test-case FilePath="feeds">
             <compilation-unit name="feeds_01">
@@ -1239,58 +1291,6 @@
           </compilation-unit>
         </test-case>
         -->
-    </test-group>
-    <test-group name="upsert">
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-secondary-rtree">
-                <output-dir compare="Text">primary-secondary-rtree</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="upsert-with-self-read">
-                <output-dir compare="Text">upsert-with-self-read</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="filtered-dataset">
-                <output-dir compare="Text">filtered-dataset</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="nullable-index">
-                <output-dir compare="Text">nullable-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="nested-index">
-                <output-dir compare="Text">nested-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="open-index">
-                <output-dir compare="Text">open-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-index">
-                <output-dir compare="Text">primary-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-secondary-btree">
-                <output-dir compare="Text">primary-secondary-btree</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-secondary-inverted">
-                <output-dir 
compare="Text">primary-secondary-inverted</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="multiple-secondaries">
-                <output-dir compare="Text">multiple-secondaries</output-dir>
-            </compilation-unit>
-        </test-case>
     </test-group>
     <test-group name="dml">
          <test-case FilePath="dml">
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473..275212c 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -129,7 +129,6 @@
             } catch (ACIDException e) {
                 throw new HyracksDataException("could not write flush log", e);
             }
-
             flushLogCreated = true;
             flushOnExit = false;
         }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 3cb8f37..e6dfa43 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -21,6 +21,9 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
 public interface IRecordDataParser<T> extends IDataParser {
 
     /**
@@ -34,4 +37,8 @@
      * @return the record class
      */
     public Class<? extends T> getRecordClass();
+
+    public default void appendKeys(IRawRecord<? extends T> record, 
ArrayTupleBuilder tb) throws IOException {
+        throw new HyracksDataException("Unsupported operation");
+    }
 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 461eaf9..09ea26f 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -31,8 +31,8 @@
     protected FeedTupleForwarder tupleForwarder;
     protected IHyracksTaskContext ctx;
     protected Map<String, String> configuration;
-    protected static final int NUMBER_OF_TUPLE_FIELDS = 1;
-    protected ArrayTupleBuilder tb = new 
ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
+    protected int numOfFields = 1;
+    protected ArrayTupleBuilder tb;
 
     @Override
     public ITupleForwarder getTupleForwarder() {
@@ -52,6 +52,8 @@
     public void configure(Map<String, String> configuration, 
IHyracksTaskContext ctx) {
         this.configuration = configuration;
         this.ctx = ctx;
+        this.numOfFields = 1;
+        this.tb = new ArrayTupleBuilder(numOfFields);
     }
 
     @Override
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
new file mode 100644
index 0000000..e70b208
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedDataFlowController<T> extends 
FeedRecordDataFlowController<T> {
+
+    @Override
+    protected void parse(IRawRecord<? extends T> record) throws IOException {
+        tb.reset();
+        dataParser.parse(record, tb.getDataOutput());
+        tb.addFieldEndOffset();
+        dataParser.appendKeys(record, tb);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, 
IHyracksTaskContext ctx) {
+        this.configuration = configuration;
+        this.ctx = ctx;
+        // 1 for the record + 1 for each key field
+        this.numOfFields = 1 + 
ExternalDataUtils.getNumberOfKeys(configuration);
+        this.tb = new ArrayTupleBuilder(numOfFields);
+    }
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2a4eaf9..45427bd 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.external.api.IRawRecord;
@@ -47,9 +48,7 @@
                     Thread.sleep(interval);
                     continue;
                 }
-                tb.reset();
-                dataParser.parse(record, tb.getDataOutput());
-                tb.addFieldEndOffset();
+                parse(record);
                 tupleForwarder.addTuple(tb);
             }
         } catch (Throwable th) {
@@ -113,4 +112,7 @@
         this.recordReader = recordReader;
         recordReader.setController(this);
     }
+
+    protected void parse(IRawRecord<? extends T> record) throws IOException {
+    }
 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
index d5640a6..e46722e 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 
 public class RecordWithMetadata<T> {
@@ -48,6 +49,7 @@
     private IValueParserFactory[] valueParserFactories;
     private byte[] fieldTypeTags;
     private IRawRecord<T> record;
+    private int[] keyIdx;
 
     // Serializers
     @SuppressWarnings("unchecked")
@@ -135,4 +137,17 @@
         mutableDouble.setValue(value);
         IDataParser.toBytes(mutableDouble, fieldValueBuffers[index], 
doubleSerde);
     }
+
+    public void setKeyIdx(int[] keyIdx) {
+        this.keyIdx = keyIdx;
+    }
+
+    public void writeKey(ArrayTupleBuilder tb) throws IOException {
+        DataOutput out = tb.getDataOutput();
+        for (int idx : keyIdx) {
+            //out.writeByte(fieldTypeTags[idx]);
+            out.write(fieldValueBuffers[idx].getByteArray(), 0, 
fieldValueBuffers[idx].getLength());
+            tb.addFieldEndOffset();
+        }
+    }
 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
index 895af1b..1a83cc9 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
@@ -61,6 +61,7 @@
 
     private static final MutationMessage POISON_PILL = new 
MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
             null);
+    private static final int[] IDX_KEY = { 0 };
     private final String feedName;
     private final short[] vbuckets;
     private final String bucket;
@@ -72,7 +73,7 @@
     private CouchbaseCore core;
     private DefaultCoreEnvironment env;
     private Thread pushThread;
-    private ArrayBlockingQueue<MutationMessage> messages;
+    private ArrayBlockingQueue<DCPRequest> messages;
     private GenericRecord<RecordWithMetadata<char[]>> record;
     private RecordWithMetadata<char[]> recordWithMetadata;
     private boolean done = false;
@@ -94,9 +95,10 @@
         this.couchbaseNodes = couchbaseNodes;
         this.vbuckets = vbuckets;
         this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes, 
char[].class);
-        this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
+        this.messages = new ArrayBlockingQueue<DCPRequest>(queueSize);
         this.value = new CharArrayRecord();
-        recordWithMetadata.setRecord(value);
+        this.recordWithMetadata.setRecord(value);
+        this.recordWithMetadata.setKeyIdx(IDX_KEY);
         this.record = new 
GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
     }
 
@@ -155,12 +157,8 @@
                             state.put(new 
BucketStreamState(message.partition(), oldState.vbucketUUID(),
                                     message.endSequenceNumber(), 
oldState.endSequenceNumber(),
                                     message.endSequenceNumber(), 
oldState.snapshotEndSequenceNumber()));
-                        } else if (dcpRequest instanceof MutationMessage) {
-
-                            messages.put((MutationMessage) dcpRequest);
-                        } else if (dcpRequest instanceof RemoveMessage) {
-                            RemoveMessage message = (RemoveMessage) dcpRequest;
-                            LOGGER.info(message.key() + " was deleted.");
+                        } else if (dcpRequest instanceof MutationMessage || 
dcpRequest instanceof RemoveMessage) {
+                            messages.put(dcpRequest);
                         }
                     } catch (Throwable th) {
                         LOGGER.error(th);
@@ -189,32 +187,39 @@
         if (messages.isEmpty()) {
             controller.flush();
         }
-        MutationMessage message = messages.take();
-        if (message == POISON_PILL) {
+        DCPRequest dcpReq = messages.take();
+        if (dcpReq == POISON_PILL) {
             return null;
+        } else if (dcpReq instanceof RemoveMessage) {
+            RemoveMessage message = (RemoveMessage) dcpReq;
+            String key = message.key();
+            recordWithMetadata.reset();
+            recordWithMetadata.setMetadata(0, key);
+        } else {
+            MutationMessage message = (MutationMessage) dcpReq;
+            String key = message.key();
+            int vbucket = message.partition();
+            long seq = message.bySequenceNumber();
+            String bucket = message.bucket();
+            long cas = message.cas();
+            long creationTime = message.creationTime();
+            int expiration = message.expiration();
+            int flags = message.flags();
+            long revSeqNumber = message.revisionSequenceNumber();
+            int lockTime = message.lockTime();
+            recordWithMetadata.reset();
+            recordWithMetadata.setMetadata(0, key);
+            recordWithMetadata.setMetadata(1, bucket);
+            recordWithMetadata.setMetadata(2, vbucket);
+            recordWithMetadata.setMetadata(3, seq);
+            recordWithMetadata.setMetadata(4, cas);
+            recordWithMetadata.setMetadata(5, creationTime);
+            recordWithMetadata.setMetadata(6, expiration);
+            recordWithMetadata.setMetadata(7, flags);
+            recordWithMetadata.setMetadata(8, revSeqNumber);
+            recordWithMetadata.setMetadata(9, lockTime);
+            CouchbaseReader.set(message.content(), decoder, bytes, chars, 
value);
         }
-        String key = message.key();
-        int vbucket = message.partition();
-        long seq = message.bySequenceNumber();
-        String bucket = message.bucket();
-        long cas = message.cas();
-        long creationTime = message.creationTime();
-        int expiration = message.expiration();
-        int flags = message.flags();
-        long revSeqNumber = message.revisionSequenceNumber();
-        int lockTime = message.lockTime();
-        recordWithMetadata.reset();
-        recordWithMetadata.setMetadata(0, key);
-        recordWithMetadata.setMetadata(1, bucket);
-        recordWithMetadata.setMetadata(2, vbucket);
-        recordWithMetadata.setMetadata(3, seq);
-        recordWithMetadata.setMetadata(4, cas);
-        recordWithMetadata.setMetadata(5, creationTime);
-        recordWithMetadata.setMetadata(6, expiration);
-        recordWithMetadata.setMetadata(7, flags);
-        recordWithMetadata.setMetadata(8, revSeqNumber);
-        recordWithMetadata.setMetadata(9, lockTime);
-        CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
         return record;
     }
 
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
index b9b6f65..6cbcbba 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -88,6 +88,8 @@
             password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
         }
         this.configuration = configuration;
+        ExternalDataUtils.setNumberOfKeys(configuration, 1);
+        ExternalDataUtils.setChangeFeed(configuration, 
ExternalDataConstants.TRUE);
         bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
         couchbaseNodes = 
configuration.get(ExternalDataConstants.KEY_NODES).split(",");
         feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index a929eec..1524219 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -23,11 +23,11 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import 
org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import 
org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 67d84b5..c834c64 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -32,10 +32,12 @@
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class RecordWithMetadataParser<T> implements 
IRecordDataParser<RecordWithMetadata<T>> {
 
@@ -95,17 +97,29 @@
             valueBuffer.reset();
             recBuilder.init();
             RecordWithMetadata<T> rwm = record.get();
-            for (int i = 0; i < numberOfFields; i++) {
-                if (i == valueIndex) {
-                    valueParser.parse(rwm.getRecord(), 
valueBuffer.getDataOutput());
-                    recBuilder.addField(i, valueBuffer);
-                } else {
-                    recBuilder.addField(i, rwm.getMetadata(metaIndexes[i]));
+            if (rwm.getRecord().size() == 0) {
+                // null record, delete message
+                out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+            } else {
+                for (int i = 0; i < numberOfFields; i++) {
+                    if (i == valueIndex) {
+                        valueParser.parse(rwm.getRecord(), 
valueBuffer.getDataOutput());
+                        recBuilder.addField(i, valueBuffer);
+                    } else {
+                        recBuilder.addField(i, 
rwm.getMetadata(metaIndexes[i]));
+                    }
                 }
+                recBuilder.write(out, true);
             }
-            recBuilder.write(out, true);
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
     }
+
+    @Override
+    public void appendKeys(IRawRecord<? extends RecordWithMetadata<T>> record, 
ArrayTupleBuilder tb)
+            throws IOException {
+        RecordWithMetadata<T> rwm = record.get();
+        rwm.writeKey(tb);
+    }
 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index dfe7aed..8fc4ce6 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.api.IStreamDataParserFactory;
 import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.dataflow.ChangeFeedDataFlowController;
 import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
 import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
@@ -69,7 +70,11 @@
                 if (indexingOp) {
                     recordDataFlowController = new 
IndexingDataFlowController();
                 } else if (ExternalDataUtils.isFeed(configuration)) {
-                    recordDataFlowController = new 
FeedRecordDataFlowController();
+                    if (ExternalDataUtils.isChangeFeed(configuration)) {
+                        recordDataFlowController = new 
ChangeFeedDataFlowController();
+                    } else {
+                        recordDataFlowController = new 
FeedRecordDataFlowController();
+                    }
                 } else {
                     recordDataFlowController = new RecordDataFlowController();
                 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 3d7f60b..32a8bf0 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -91,6 +91,10 @@
     public static final String KEY_VALUE_INDEX = "value-index";
     // a string representing the format of the raw record in the value field 
in the data type
     public static final String KEY_VALUE_FORMAT = "value-format";
+    // a boolean indicating whether the feed is a change feed
+    public static final String KEY_IS_CHANGE_FEED = "change-feed";
+    // an integer representing the number of keys in a change feed
+    public static final String KEY_KEY_SIZE = "key-size";
     /**
      * HDFS class names
      */
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7c03e4d..a9d91f3 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -264,4 +264,20 @@
                 ? 
Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
                 : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
     }
+
+    public static boolean isChangeFeed(Map<String, String> configuration) {
+        return 
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
+    }
+
+    public static int getNumberOfKeys(Map<String, String> configuration) {
+        return 
Integer.parseInt(configuration.get(ExternalDataConstants.KEY_KEY_SIZE));
+    }
+
+    public static void setNumberOfKeys(Map<String, String> configuration, int 
value) {
+        configuration.put(ExternalDataConstants.KEY_KEY_SIZE, 
String.valueOf(value));
+    }
+
+    public static void setChangeFeed(Map<String, String> configuration, String 
booleanString) {
+        configuration.put(ExternalDataConstants.KEY_IS_CHANGE_FEED, 
booleanString);
+    }
 }
diff --git 
a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
 
b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 3f85ba9..2689cc4 100644
--- 
a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ 
b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
@@ -178,6 +179,14 @@
         return connectionRequest.getReceivingFeedId().getDataverse();
     }
 
+    public boolean isChangeFeed(MetadataTransactionContext mdTxnCtx) throws 
MetadataException, AlgebricksException {
+        FeedId feedId = connectionRequest.getReceivingFeedId();
+        Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getFeedName());
+        FeedPolicyAccessor policyAccessor = new 
FeedPolicyAccessor(connectionRequest.getPolicyParameters());
+        FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, policyAccessor, 
mdTxnCtx);
+        return ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
+    }
+
     private String getOutputType(MetadataTransactionContext mdTxnCtx) throws 
MetadataException {
         String outputType = null;
         FeedId feedId = connectionRequest.getReceivingFeedId();
diff --git 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f3523da..0704be1 100644
--- 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -70,6 +70,7 @@
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import 
org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -330,11 +331,12 @@
             IDataSource<AqlSourceId> dataSource, List<LogicalVariable> 
scanVariables,
             List<LogicalVariable> projectVariables, boolean projectPushed, 
List<LogicalVariable> minFilterVars,
             List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
-            JobGenContext context, JobSpecification jobSpec, Object 
implConfig) throws AlgebricksException {
+            JobGenContext context, JobSpecification jobSpec, Object 
implConfig, List<LogicalVariable> keyVars)
+                    throws AlgebricksException {
         try {
             switch (((AqlDataSource) dataSource).getDatasourceType()) {
                 case FEED:
-                    return buildFeedCollectRuntime(jobSpec, dataSource);
+                    return buildFeedCollectRuntime(jobSpec, dataSource, 
typeEnv, keyVars);
                 case INTERNAL_DATASET: {
                     // querying an internal dataset
                     return buildInternalDatasetScan(jobSpec, scanVariables, 
minFilterVars, maxFilterVars, opSchema,
@@ -380,7 +382,8 @@
 
     @SuppressWarnings("rawtypes")
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildFeedCollectRuntime(JobSpecification jobSpec,
-            IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+            IDataSource<AqlSourceId> dataSource, IVariableTypeEnvironment 
typeEnv, List<LogicalVariable> keyVars)
+                    throws AlgebricksException {
 
         FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedCollectOperatorDescriptor feedCollector = null;
@@ -389,7 +392,20 @@
             ARecordType feedOutputType = (ARecordType) 
feedDataSource.getItemType();
             ISerializerDeserializer payloadSerde = 
NonTaggedDataFormat.INSTANCE.getSerdeProvider()
                     .getSerializerDeserializer(feedOutputType);
-            RecordDescriptor feedDesc = new RecordDescriptor(new 
ISerializerDeserializer[] { payloadSerde });
+            RecordDescriptor feedDesc;
+            if (keyVars != null) {
+                ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
+                serdes.add(payloadSerde);
+                for (LogicalVariable var : keyVars) {
+                    IAType type = (IAType) typeEnv.getVarType(var);
+                    ISerializerDeserializer serde = 
AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(type);
+                    serdes.add(serde);
+                }
+                feedDesc = new RecordDescriptor(serdes.toArray(new 
ISerializerDeserializer[serdes.size()]));
+            } else {
+                feedDesc = new RecordDescriptor(new ISerializerDeserializer[] 
{ payloadSerde });
+            }
 
             FeedPolicyEntity feedPolicy = (FeedPolicyEntity) ((AqlDataSource) 
dataSource).getProperties()
                     .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
@@ -402,10 +418,8 @@
             feedCollector = new FeedCollectOperatorDescriptor(jobSpec, 
feedConnectionId,
                     feedDataSource.getSourceFeedId(), feedOutputType, 
feedDesc, feedPolicy.getProperties(),
                     feedDataSource.getLocation());
-
             return new Pair<IOperatorDescriptor, 
AlgebricksPartitionConstraint>(feedCollector,
                     determineLocationConstraint(feedDataSource));
-
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -2260,7 +2274,6 @@
         }
         boolean temp = dataset.getDatasetDetails().isTemp();
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
         int numKeys = primaryKeys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 
: 1;
         // Move key fields to front. {keys, record, filters}
diff --git 
a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java 
b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index c2eae36..7600925 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.om.visitors.IOMVisitor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -177,6 +178,7 @@
         return subRecordType;
     }
 
+    // Note: this method doesn't work for nested fields
     /**
      * Returns the field type of the field name if it exists, otherwise null.
      *
@@ -299,4 +301,14 @@
         return NonTaggedFormatUtil.hasNullableField(rt) ? (int) 
Math.ceil(rt.getFieldNames().length / 8.0) : 0;
     }
 
+    public void getFieldTypes(List<List<String>> fields, List<IAType> 
emptyList) throws AlgebricksException {
+        for (List<String> field : fields) {
+            try {
+                emptyList.add(getSubFieldType(field));
+            } catch (IOException e) {
+                throw new AlgebricksException(e);
+            }
+        }
+    }
+
 }
diff --git 
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
 
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index f35f4d6..ce2ae26 100644
--- 
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.INullWriter;
@@ -110,7 +111,6 @@
         writer.open();
         indexHelper.open();
         lsmIndex = (LSMBTree) indexHelper.getIndexInstance();
-
         try {
             nullTupleBuilder = new ArrayTupleBuilder(1);
             DataOutput out = nullTupleBuilder.getDataOutput();
@@ -144,14 +144,15 @@
         key.reset(accessor, tupleIndex);
     }
 
-    protected void writeOutput(int tupleIndex) throws Exception {
+    protected void writeOutput(int tupleIndex, boolean insert) throws 
Exception {
+        boolean delete = prevTuple != null;
         tb.reset();
         frameTuple.reset(accessor, tupleIndex);
         for (int i = 0; i < frameTuple.getFieldCount(); i++) {
             dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), 
frameTuple.getFieldLength(i));
             tb.addFieldEndOffset();
         }
-        if (prevTuple != null) {
+        if (delete) {
             dos.write(prevTuple.getFieldData(numOfPrimaryKeys), 
prevTuple.getFieldStart(numOfPrimaryKeys),
                     prevTuple.getFieldLength(numOfPrimaryKeys));
             tb.addFieldEndOffset();
@@ -168,12 +169,18 @@
                 addNullField();
             }
         }
-        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
+        if (insert || delete) {
+            FrameUtils.appendToWriter(writer, appender, 
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        }
     }
 
     private void addNullField() throws IOException {
         dos.write(nullTupleBuilder.getByteArray());
         tb.addFieldEndOffset();
+    }
+
+    public static boolean isNull(PermutingFrameTupleReference t1, int field) {
+        return t1.getFieldData(0)[t1.getFieldStart(field)] == 
ATypeTag.SERIALIZED_NULL_TYPE_TAG;
     }
 
     //TODO: use tryDelete/tryInsert in order to prevent deadlocks
@@ -182,12 +189,11 @@
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessor;
         int tupleCount = accessor.getTupleCount();
-
         try {
             for (int i = 0; i < tupleCount; i++) {
+                boolean insert = false;
                 tuple.reset(accessor, i);
                 resetSearchPredicate(i);
-                cursor.reset();
                 lsmAccessor.search(cursor, searchPred);
                 if (cursor.hasNext()) {
                     cursor.next();
@@ -203,15 +209,19 @@
                         lsmAccessor.forceDelete(prevTuple);
                     }
                 } else {
+                    cursor.reset();
                     prevTuple = null;
                 }
                 modCallback.setOp(Operation.INSERT);
-                if (prevTuple == null && i == 0) {
-                    lsmAccessor.insert(tuple);
-                } else {
-                    lsmAccessor.forceInsert(tuple);
+                if (!isNull(tuple, numOfPrimaryKeys)) {
+                    insert = true;
+                    if (prevTuple == null && i == 0) {
+                        lsmAccessor.insert(tuple);
+                    } else {
+                        lsmAccessor.forceInsert(tuple);
+                    }
                 }
-                writeOutput(i);
+                writeOutput(i, insert);
             }
             if (tupleCount > 0) {
                 // All tuples has to move forward to maintain the correctness 
of the transaction pipeline
@@ -272,4 +282,4 @@
     public void flush() throws HyracksDataException {
         writer.flush();
     }
-}
+}
\ No newline at end of file
diff --git 
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
 
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
index 65dc83f..40bd21d 100644
--- 
a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -88,7 +88,7 @@
         return true;
     }
 
-    private boolean isNull(PermutingFrameTupleReference t1) {
+    public static boolean isNull(PermutingFrameTupleReference t1) {
         return t1.getFieldData(0)[t1.getFieldStart(0)] == 
ATypeTag.SERIALIZED_NULL_TYPE_TAG;
     }
 
@@ -122,7 +122,6 @@
                     modCallback.setOp(Operation.INSERT);
                     lsmAccessor.forceInsert(tuple);
                 }
-
             } catch (HyracksDataException e) {
                 throw e;
             } catch (Exception e) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/621
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If136a03d424970132dfb09f0dda56e160d4c0078
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to