Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1068

Change subject: Add upsert option for feed
......................................................................

Add upsert option for feed

For ASTERIXDB-1567. Provide "upsert-feed"="true" for
feed configuration, which changes the default record
insert to upsert.

Change-Id: Ic5133e7c6941fea4110cc9983f99502f364dc810
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
4 files changed, 175 insertions(+), 186 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/68/1068/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 5081587..8521c01 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -81,6 +81,7 @@
 import org.apache.asterix.metadata.declared.ResultSetDataSink;
 import org.apache.asterix.metadata.declared.ResultSetSinkId;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
@@ -181,13 +182,11 @@
         
FormatUtils.getDefaultFormat().registerRuntimeFunctions(FunctionCollection.getFunctionDescriptorFactories());
     }
 
-    @Override
-    public int getVarCounter() {
+    @Override public int getVarCounter() {
         return context.getVarCounter();
     }
 
-    @Override
-    public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws 
AlgebricksException {
+    @Override public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) 
throws AlgebricksException {
         CompiledLoadFromFileStatement clffs = (CompiledLoadFromFileStatement) 
stmt;
         Dataset dataset = 
metadataProvider.findDataset(clffs.getDataverseName(), clffs.getDatasetName());
         if (dataset == null) {
@@ -196,10 +195,10 @@
                     "Unable to load dataset " + clffs.getDatasetName() + " 
since it does not exist");
         }
         IAType itemType = 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
-        IAType metaItemType =
-                
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
-        DatasetDataSource targetDatasource =
-                validateDatasetInfo(metadataProvider, stmt.getDataverseName(), 
stmt.getDatasetName());
+        IAType metaItemType = metadataProvider
+                .findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
+        DatasetDataSource targetDatasource = 
validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
+                stmt.getDatasetName());
         List<List<String>> partitionKeys = 
DatasetUtils.getPartitioningKeys(targetDatasource.getDataset());
         if (dataset.hasMetaPart()) {
             throw new AlgebricksException(
@@ -235,8 +234,8 @@
         List<Mutable<ILogicalExpression>> varRefsForLoading = new 
ArrayList<>();
         LogicalVariable payloadVar = payloadVars.get(0);
         for (List<String> keyFieldName : partitionKeys) {
-            PlanTranslationUtil.prepareVarAndExpression(keyFieldName, 
payloadVar, pkVars, pkExprs, varRefsForLoading,
-                    context);
+            PlanTranslationUtil
+                    .prepareVarAndExpression(keyFieldName, payloadVar, pkVars, 
pkExprs, varRefsForLoading, context);
         }
 
         AssignOperator assign = new AssignOperator(pkVars, pkExprs);
@@ -262,8 +261,8 @@
             additionalFilteringExpressions = new ArrayList<>();
             
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, 
payloadVar, additionalFilteringVars,
                     additionalFilteringAssignExpressions, 
additionalFilteringExpressions, context);
-            additionalFilteringAssign =
-                    new AssignOperator(additionalFilteringVars, 
additionalFilteringAssignExpressions);
+            additionalFilteringAssign = new 
AssignOperator(additionalFilteringVars,
+                    additionalFilteringAssignExpressions);
         }
 
         InsertDeleteUpsertOperator insertOp = new 
InsertDeleteUpsertOperator(targetDatasource, payloadRef,
@@ -282,12 +281,10 @@
         return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public ILogicalPlan translate(Query expr, String outputDatasetName, 
ICompiledDmlStatement stmt)
-            throws AlgebricksException {
-        Pair<ILogicalOperator, LogicalVariable> p =
-                expr.accept(this, new MutableObject<>(new 
EmptyTupleSourceOperator()));
+    @SuppressWarnings("unchecked") @Override public ILogicalPlan 
translate(Query expr, String outputDatasetName,
+            ICompiledDmlStatement stmt) throws AlgebricksException {
+        Pair<ILogicalOperator, LogicalVariable> p = expr
+                .accept(this, new MutableObject<>(new 
EmptyTupleSourceOperator()));
         ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new 
ArrayList<>();
         ILogicalOperator topOp = p.first;
         ProjectOperator project = (ProjectOperator) topOp;
@@ -321,19 +318,18 @@
              */
             LogicalVariable seqVar = context.newVar();
             /** This assign adds a marker function collection-to-sequence: if 
the input is a singleton collection, unnest it; otherwise do nothing. */
-            AssignOperator assignCollectionToSequence = new 
AssignOperator(seqVar,
-                    new MutableObject<>(new ScalarFunctionCallExpression(
+            AssignOperator assignCollectionToSequence = new 
AssignOperator(seqVar, new MutableObject<>(
+                    new ScalarFunctionCallExpression(
                             
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.COLLECTION_TO_SEQUENCE),
                             new MutableObject<>(new 
VariableReferenceExpression(resVar)))));
             assignCollectionToSequence.getInputs().add(new 
MutableObject<>(project.getInputs().get(0).getValue()));
             project.getInputs().get(0).setValue(assignCollectionToSequence);
             project.getVariables().set(0, seqVar);
             resVar = seqVar;
-            DatasetDataSource targetDatasource =
-                    validateDatasetInfo(metadataProvider, 
stmt.getDataverseName(), stmt.getDatasetName());
-            List<Integer> keySourceIndicator =
-                    ((InternalDatasetDetails) 
targetDatasource.getDataset().getDatasetDetails())
-                            .getKeySourceIndicator();
+            DatasetDataSource targetDatasource = 
validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
+                    stmt.getDatasetName());
+            List<Integer> keySourceIndicator = ((InternalDatasetDetails) 
targetDatasource.getDataset()
+                    .getDatasetDetails()).getKeySourceIndicator();
             ArrayList<LogicalVariable> vars = new ArrayList<>();
             ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
             List<Mutable<ILogicalExpression>> varRefsForLoading = new 
ArrayList<>();
@@ -342,8 +338,9 @@
             for (int i = 0; i < numOfPrimaryKeys; i++) {
                 if (keySourceIndicator == null || 
keySourceIndicator.get(i).intValue() == 0) {
                     // record part
-                    
PlanTranslationUtil.prepareVarAndExpression(partitionKeys.get(i), resVar, vars, 
exprs,
-                            varRefsForLoading, context);
+                    PlanTranslationUtil
+                            .prepareVarAndExpression(partitionKeys.get(i), 
resVar, vars, exprs, varRefsForLoading,
+                                    context);
                 } else {
                     // meta part
                     
PlanTranslationUtil.prepareMetaKeyAccessExpression(partitionKeys.get(i), 
unnestVar, exprs, vars,
@@ -365,8 +362,8 @@
                 
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, resVar, 
additionalFilteringVars,
                         additionalFilteringAssignExpressions, 
additionalFilteringExpressions, context);
 
-                additionalFilteringAssign =
-                        new AssignOperator(additionalFilteringVars, 
additionalFilteringAssignExpressions);
+                additionalFilteringAssign = new 
AssignOperator(additionalFilteringVars,
+                        additionalFilteringAssignExpressions);
                 additionalFilteringAssign.getInputs().add(new 
MutableObject<>(project));
                 assign.getInputs().add(new 
MutableObject<>(additionalFilteringAssign));
             } else {
@@ -449,8 +446,11 @@
         List<LogicalVariable> metaAndKeysVars = null;
         List<Mutable<ILogicalExpression>> metaAndKeysExprs = null;
         List<Mutable<ILogicalExpression>> metaExpSingletonList = null;
-        boolean isChangeFeed =
-                FeedMetadataUtil.isChangeFeed(metadataProvider, 
sfs.getDataverseName(), sfs.getFeedName());
+        Feed feed = metadataProvider.findFeed(sfs.getDataverseName(), 
sfs.getFeedName());
+
+        boolean isChangeFeed = FeedMetadataUtil.isChangeFeed(feed, 
sfs.getDataverseName(), sfs.getFeedName());
+        boolean isUpsertFeed = FeedMetadataUtil.isUpsertFeed(feed, 
sfs.getDataverseName(), sfs.getFeedName());
+
         if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
             metaAndKeysVars = new ArrayList<>();
             metaAndKeysExprs = new ArrayList<>();
@@ -507,8 +507,13 @@
                 feedModificationOp.getInputs().add(assign.getInputs().get(0));
             }
         } else {
-            feedModificationOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                    metaExpSingletonList, 
InsertDeleteUpsertOperator.Kind.INSERT, false);
+            if (isUpsertFeed) {
+                feedModificationOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                        metaExpSingletonList, 
InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            } else {
+                feedModificationOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+                        metaExpSingletonList, 
InsertDeleteUpsertOperator.Kind.INSERT, false);
+            }
             feedModificationOp.getInputs().add(new MutableObject<>(assign));
         }
         if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
@@ -575,8 +580,8 @@
         }
         AqlSourceId sourceId = new AqlSourceId(dataverseName, datasetName);
         IAType itemType = 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
-        IAType metaItemType =
-                
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
+        IAType metaItemType = metadataProvider
+                .findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
         INodeDomain domain = 
metadataProvider.findNodeDomain(dataset.getNodeGroupName());
         return new DatasetDataSource(sourceId, dataset, itemType, 
metaItemType, AqlDataSourceType.INTERNAL_DATASET,
                 dataset.getDatasetDetails(), domain);
@@ -590,8 +595,7 @@
         return new FileSplit(metadataProperties.getMetadataNodeName(), new 
FileReference(new File(filePath)));
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(LetClause lc, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(LetClause 
lc, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         LogicalVariable v;
         ILogicalOperator returnedOp;
@@ -602,33 +606,31 @@
             returnedOp.getInputs().add(tupSource);
         } else {
             v = context.newVar(lc.getVarExpr());
-            Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo =
-                    langExprToAlgExpression(lc.getBindingExpr(), tupSource);
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = 
langExprToAlgExpression(lc.getBindingExpr(),
+                    tupSource);
             returnedOp = new AssignOperator(v, new MutableObject<>(eo.first));
             returnedOp.getInputs().add(eo.second);
         }
         return new Pair<>(returnedOp, v);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(FieldAccessor fa, 
Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(FieldAccessor fa,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = 
langExprToAlgExpression(fa.getExpr(), tupSource);
         LogicalVariable v = context.newVar();
         AbstractFunctionCallExpression fldAccess = new 
ScalarFunctionCallExpression(
                 
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME));
         fldAccess.getArguments().add(new MutableObject<>(p.first));
-        ILogicalExpression faExpr =
-                new ConstantExpression(new AsterixConstantValue(new 
AString(fa.getIdent().getValue())));
+        ILogicalExpression faExpr = new ConstantExpression(
+                new AsterixConstantValue(new 
AString(fa.getIdent().getValue())));
         fldAccess.getArguments().add(new MutableObject<>(faExpr));
         AssignOperator a = new AssignOperator(v, new 
MutableObject<>(fldAccess));
         a.getInputs().add(p.second);
         return new Pair<>(a, v);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(IndexAccessor ia, 
Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(IndexAccessor ia,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = 
langExprToAlgExpression(ia.getExpr(), tupSource);
         LogicalVariable v = context.newVar();
         AbstractFunctionCallExpression f;
@@ -637,8 +639,8 @@
                     
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.ANY_COLLECTION_MEMBER));
             f.getArguments().add(new MutableObject<>(p.first));
         } else {
-            Pair<ILogicalExpression, Mutable<ILogicalOperator>> indexPair =
-                    langExprToAlgExpression(ia.getIndexExpr(), tupSource);
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> indexPair = 
langExprToAlgExpression(ia.getIndexExpr(),
+                    tupSource);
             f = new 
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM));
             f.getArguments().add(new MutableObject<>(p.first));
             f.getArguments().add(new MutableObject<>(indexPair.first));
@@ -648,8 +650,7 @@
         return new Pair<>(a, v);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(CallExpr fcall, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(CallExpr 
fcall, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         LogicalVariable v = context.newVar();
         FunctionSignature signature = fcall.getFunctionSignature();
@@ -745,8 +746,8 @@
         if (AsterixBuiltinFunctions.isBuiltinAggregateFunction(fi)) {
             f = AsterixBuiltinFunctions.makeAggregateFunctionExpression(fi, 
args);
         } else if (AsterixBuiltinFunctions.isBuiltinUnnestingFunction(fi)) {
-            UnnestingFunctionCallExpression ufce =
-                    new 
UnnestingFunctionCallExpression(FunctionUtil.getFunctionInfo(fi), args);
+            UnnestingFunctionCallExpression ufce = new 
UnnestingFunctionCallExpression(FunctionUtil.getFunctionInfo(fi),
+                    args);
             
ufce.setReturnsUniqueValues(AsterixBuiltinFunctions.returnsUniqueValues(fi));
             f = ufce;
         } else {
@@ -755,30 +756,27 @@
         return f;
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(FunctionDecl fd, 
Mutable<ILogicalOperator> tupSource) {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(FunctionDecl fd,
+            Mutable<ILogicalOperator> tupSource) {
         throw new IllegalStateException("Function declarations should be 
inlined at AST rewriting phase.");
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(GroupbyClause gc, 
Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
+    @SuppressWarnings("unchecked") @Override public Pair<ILogicalOperator, 
LogicalVariable> visit(GroupbyClause gc,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
         Mutable<ILogicalOperator> topOp = tupSource;
         if (gc.hasGroupVar()) {
             List<Pair<Expression, Identifier>> groupFieldList = 
gc.getGroupFieldList();
             List<Mutable<ILogicalExpression>> groupRecordConstructorArgList = 
new ArrayList<>();
             for (Pair<Expression, Identifier> groupField : groupFieldList) {
-                ILogicalExpression groupFieldNameExpr =
-                        langExprToAlgExpression(new LiteralExpr(new 
StringLiteral(groupField.second.getValue())),
-                                topOp).first;
+                ILogicalExpression groupFieldNameExpr = 
langExprToAlgExpression(
+                        new LiteralExpr(new 
StringLiteral(groupField.second.getValue())), topOp).first;
                 groupRecordConstructorArgList.add(new 
MutableObject<>(groupFieldNameExpr));
                 ILogicalExpression groupFieldExpr = 
langExprToAlgExpression(groupField.first, topOp).first;
                 groupRecordConstructorArgList.add(new 
MutableObject<>(groupFieldExpr));
             }
             LogicalVariable groupVar = context.newVar(gc.getGroupVar());
-            AssignOperator groupVarAssignOp = new AssignOperator(groupVar,
-                    new MutableObject<>(new ScalarFunctionCallExpression(
+            AssignOperator groupVarAssignOp = new AssignOperator(groupVar, new 
MutableObject<>(
+                    new ScalarFunctionCallExpression(
                             
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
                             groupRecordConstructorArgList)));
             groupVarAssignOp.getInputs().add(topOp);
@@ -854,8 +852,7 @@
 
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(IfExpr ifexpr, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(IfExpr 
ifexpr, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         // In the most general case, IfThenElse is translated in the following
         // way.
@@ -894,22 +891,20 @@
 
         // Unnests the selected ("if" or "else") result.
         LogicalVariable unnestVar = context.newVar();
-        UnnestOperator unnestOp = new UnnestOperator(unnestVar,
-                new MutableObject<>(new UnnestingFunctionCallExpression(
-                        
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), 
Collections
-                                .singletonList(new MutableObject<>(new 
VariableReferenceExpression(selectVar))))));
+        UnnestOperator unnestOp = new UnnestOperator(unnestVar, new 
MutableObject<>(new UnnestingFunctionCallExpression(
+                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
+                Collections.singletonList(new MutableObject<>(new 
VariableReferenceExpression(selectVar))))));
         unnestOp.getInputs().add(new MutableObject<>(assignOp));
 
         // Produces the final result.
         LogicalVariable resultVar = context.newVar();
-        AssignOperator finalAssignOp =
-                new AssignOperator(resultVar, new MutableObject<>(new 
VariableReferenceExpression(unnestVar)));
+        AssignOperator finalAssignOp = new AssignOperator(resultVar,
+                new MutableObject<>(new 
VariableReferenceExpression(unnestVar)));
         finalAssignOp.getInputs().add(new MutableObject<>(unnestOp));
         return new Pair<>(finalAssignOp, resultVar);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(LiteralExpr l, 
Mutable<ILogicalOperator> tupSource) {
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(LiteralExpr 
l, Mutable<ILogicalOperator> tupSource) {
         LogicalVariable var = context.newVar();
         AssignOperator a = new AssignOperator(var, new MutableObject<>(
                 new ConstantExpression(new 
AsterixConstantValue(ConstantHelper.objectFromLiteral(l.getValue())))));
@@ -919,8 +914,7 @@
         return new Pair<>(a, var);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(OperatorExpr op, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(OperatorExpr op, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         List<OperatorType> ops = op.getOpList();
         int nOps = ops.size();
@@ -1001,9 +995,8 @@
         return new Pair<>(a, assignedVar);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(OrderbyClause oc, 
Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(OrderbyClause oc,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
         OrderOperator ord = new OrderOperator();
         Iterator<OrderModifier> modifIter = oc.getModifierList().iterator();
         Mutable<ILogicalOperator> topOp = tupSource;
@@ -1030,9 +1023,8 @@
         return new Pair<>(ord, null);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(QuantifiedExpression 
qe, Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(QuantifiedExpression qe,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
         Mutable<ILogicalOperator> topOp = tupSource;
 
         ILogicalOperator firstOp = null;
@@ -1065,16 +1057,17 @@
         if (qe.getQuantifier() == Quantifier.SOME) {
             s = new SelectOperator(new MutableObject<>(eo2.first), false, 
null);
             s.getInputs().add(eo2.second);
-            fAgg = 
AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.NON_EMPTY_STREAM,
-                    new ArrayList<>());
+            fAgg = AsterixBuiltinFunctions
+                    
.makeAggregateFunctionExpression(AsterixBuiltinFunctions.NON_EMPTY_STREAM, new 
ArrayList<>());
         } else { // EVERY
             List<Mutable<ILogicalExpression>> satExprList = new ArrayList<>(1);
             satExprList.add(new MutableObject<>(eo2.first));
-            s = new SelectOperator(new MutableObject<>(new 
ScalarFunctionCallExpression(
-                    
FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT), satExprList)), 
false, null);
+            s = new SelectOperator(new MutableObject<>(
+                    new 
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT),
+                            satExprList)), false, null);
             s.getInputs().add(eo2.second);
-            fAgg = 
AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.EMPTY_STREAM,
-                    new ArrayList<>());
+            fAgg = AsterixBuiltinFunctions
+                    
.makeAggregateFunctionExpression(AsterixBuiltinFunctions.EMPTY_STREAM, new 
ArrayList<>());
         }
         LogicalVariable qeVar = context.newVar();
         AggregateOperator a = new 
AggregateOperator(mkSingletonArrayList(qeVar),
@@ -1083,15 +1076,13 @@
         return new Pair<>(a, qeVar);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(Query q, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(Query q, 
Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         return q.getBody().accept(this, tupSource);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(RecordConstructor rc, 
Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(RecordConstructor rc,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
         AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(
                 
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR));
         LogicalVariable v1 = context.newVar();
@@ -1109,11 +1100,11 @@
         return new Pair<>(a, v1);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(ListConstructor lc, 
Mutable<ILogicalOperator> tupSource)
-            throws AsterixException {
-        FunctionIdentifier fid = (lc.getType() == 
Type.ORDERED_LIST_CONSTRUCTOR)
-                ? AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR : 
AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR;
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(ListConstructor lc,
+            Mutable<ILogicalOperator> tupSource) throws AsterixException {
+        FunctionIdentifier fid = (lc.getType() == 
Type.ORDERED_LIST_CONSTRUCTOR) ?
+                AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR :
+                AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR;
         AbstractFunctionCallExpression f = new 
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fid));
         LogicalVariable v1 = context.newVar();
         AssignOperator a = new AssignOperator(v1, new MutableObject<>(f));
@@ -1127,8 +1118,7 @@
         return new Pair<>(a, v1);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(UnaryExpr u, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(UnaryExpr 
u, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         Expression expr = u.getExpr();
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo = 
langExprToAlgExpression(expr, tupSource);
@@ -1157,8 +1147,8 @@
         return new Pair<>(a, v1);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(VariableExpr v, 
Mutable<ILogicalOperator> tupSource) {
+    @Override public Pair<ILogicalOperator, LogicalVariable> 
visit(VariableExpr v,
+            Mutable<ILogicalOperator> tupSource) {
         // Should we ever get to this method?
         LogicalVariable var = context.newVar();
         LogicalVariable oldV = context.getVar(v.getVar().getId());
@@ -1167,8 +1157,7 @@
         return new Pair<>(a, var);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(WhereClause w, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(WhereClause 
w, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> p = 
langExprToAlgExpression(w.getWhereExpr(), tupSource);
         SelectOperator s = new SelectOperator(new MutableObject<>(p.first), 
false, null);
@@ -1176,8 +1165,7 @@
         return new Pair<>(s, null);
     }
 
-    @Override
-    public Pair<ILogicalOperator, LogicalVariable> visit(LimitClause lc, 
Mutable<ILogicalOperator> tupSource)
+    @Override public Pair<ILogicalOperator, LogicalVariable> visit(LimitClause 
lc, Mutable<ILogicalOperator> tupSource)
             throws AsterixException {
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> p1 = 
langExprToAlgExpression(lc.getLimitExpr(), tupSource);
         LimitOperator opLim;
@@ -1277,8 +1265,8 @@
             Mutable<ILogicalOperator> topOpRef) throws AsterixException {
         switch (expr.getKind()) {
             case VARIABLE_EXPRESSION:
-                VariableReferenceExpression ve =
-                        new 
VariableReferenceExpression(context.getVar(((VariableExpr) 
expr).getVar().getId()));
+                VariableReferenceExpression ve = new 
VariableReferenceExpression(
+                        context.getVar(((VariableExpr) 
expr).getVar().getId()));
                 return new Pair<>(ve, topOpRef);
             case LITERAL_EXPRESSION:
                 LiteralExpr val = (LiteralExpr) expr;
@@ -1393,8 +1381,9 @@
                         
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
             case FUNCTION_CALL:
                 AbstractFunctionCallExpression fce = 
(AbstractFunctionCallExpression) expr;
-                return (fce.getKind() == FunctionKind.UNNEST) ? expr
-                        : new UnnestingFunctionCallExpression(
+                return (fce.getKind() == FunctionKind.UNNEST) ?
+                        expr :
+                        new UnnestingFunctionCallExpression(
                                 
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
             default:
                 return expr;
@@ -1422,8 +1411,7 @@
      * Eliminate shared operator references in a query plan.
      * Deep copy a new query plan subtree whenever there is a shared operator 
reference.
      *
-     * @param plan,
-     *            the query plan.
+     * @param plan, the query plan.
      * @throws AsterixException
      */
     private void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) 
throws AsterixException {
@@ -1437,12 +1425,10 @@
      * Eliminate shared operator references in a query plan rooted at 
<code>currentOpRef.getValue()</code>.
      * Deep copy a new query plan subtree whenever there is a shared operator 
reference.
      *
-     * @param currentOpRef,
-     *            the operator reference to consider
-     * @param opRefSet,
-     *            the set storing seen operator references so far.
+     * @param currentOpRef, the operator reference to consider
+     * @param opRefSet,     the set storing seen operator references so far.
      * @return a mapping that maps old variables to new variables, for the 
ancestors of
-     *         <code>currentOpRef</code> to replace variables properly.
+     * <code>currentOpRef</code> to replace variables properly.
      * @throws AsterixException
      */
     private Map<LogicalVariable, LogicalVariable> 
eliminateSharedOperatorReference(
@@ -1471,8 +1457,8 @@
                 if (opRefSet.contains(childRef)) {
                     // There is a shared operator reference in the query plan.
                     // Deep copies the child plan.
-                    LogicalOperatorDeepCopyWithNewVariablesVisitor visitor =
-                            new 
LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
+                    LogicalOperatorDeepCopyWithNewVariablesVisitor visitor = 
new LogicalOperatorDeepCopyWithNewVariablesVisitor(
+                            context, null);
                     ILogicalOperator newChild = 
childRef.getValue().accept(visitor, null);
                     Map<LogicalVariable, LogicalVariable> cloneVarMap = 
visitor.getInputToOutputVariableMapping();
 
@@ -1487,8 +1473,8 @@
 
                 // Recursively eliminate shared operator reference for the 
operator subtree,
                 // even if it is a deep copy of some other one.
-                Map<LogicalVariable, LogicalVariable> childVarMap =
-                        eliminateSharedOperatorReference(childRef, opRefSet);
+                Map<LogicalVariable, LogicalVariable> childVarMap = 
eliminateSharedOperatorReference(childRef,
+                        opRefSet);
                 // Substitute variables according to the new subtree.
                 VariableUtilities.substituteVariables(currentOperator, 
childVarMap, null);
 
@@ -1517,12 +1503,9 @@
     /**
      * Constructs a subplan operator for a branch in a if-else (or case) 
expression.
      *
-     * @param inputOp,
-     *            the input operator.
-     * @param selectExpr,
-     *            the expression to select tuples that are processed by this 
branch.
-     * @param branchExpression,
-     *            the expression to be evaluated in this branch.
+     * @param inputOp,          the input operator.
+     * @param selectExpr,       the expression to select tuples that are 
processed by this branch.
+     * @param branchExpression, the expression to be evaluated in this branch.
      * @return a pair of the constructed subplan operator and the output 
variable for the branch.
      * @throws AsterixException
      */
@@ -1531,18 +1514,18 @@
         context.enterSubplan();
         SubplanOperator subplanOp = new SubplanOperator();
         subplanOp.getInputs().add(new MutableObject<>(inputOp));
-        Mutable<ILogicalOperator> nestedSource =
-                new MutableObject<>(new NestedTupleSourceOperator(new 
MutableObject<>(subplanOp)));
+        Mutable<ILogicalOperator> nestedSource = new MutableObject<>(
+                new NestedTupleSourceOperator(new MutableObject<>(subplanOp)));
         SelectOperator select = new SelectOperator(selectExpr, false, null);
         // The select operator cannot be moved up and down, otherwise it will 
cause typing issues (ASTERIXDB-1203).
         OperatorPropertiesUtil.markMovable(select, false);
         select.getInputs().add(nestedSource);
         Pair<ILogicalOperator, LogicalVariable> pBranch = 
branchExpression.accept(this, new MutableObject<>(select));
         LogicalVariable branchVar = context.newVar();
-        AggregateOperator aggOp = new 
AggregateOperator(Collections.singletonList(branchVar),
-                Collections.singletonList(new MutableObject<>(new 
AggregateFunctionCallExpression(
-                        
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false, 
Collections.singletonList(
-                                new MutableObject<>(new 
VariableReferenceExpression(pBranch.second)))))));
+        AggregateOperator aggOp = new 
AggregateOperator(Collections.singletonList(branchVar), 
Collections.singletonList(
+                new MutableObject<>(new AggregateFunctionCallExpression(
+                        
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false, 
Collections
+                        .singletonList(new MutableObject<>(new 
VariableReferenceExpression(pBranch.second)))))));
         aggOp.getInputs().add(new MutableObject<>(pBranch.first));
         ILogicalPlan planForBranch = new ALogicalPlanImpl(new 
MutableObject<>(aggOp));
         subplanOp.getNestedPlans().add(planForBranch);
@@ -1552,8 +1535,8 @@
 
     // Processes EXISTS and NOT EXISTS.
     private AssignOperator processExists(ILogicalExpression inputExpr, 
LogicalVariable v1, boolean not) {
-        AbstractFunctionCallExpression count =
-                new 
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCALAR_COUNT));
+        AbstractFunctionCallExpression count = new 
ScalarFunctionCallExpression(
+                
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCALAR_COUNT));
         count.getArguments().add(new MutableObject<>(inputExpr));
         AbstractFunctionCallExpression comparison = new 
ScalarFunctionCallExpression(
                 FunctionUtil.getFunctionInfo(not ? AsterixBuiltinFunctions.EQ 
: AsterixBuiltinFunctions.NEQ));
@@ -1606,8 +1589,8 @@
             LogicalVariable unnestVar = context.newVar();
             List<Mutable<ILogicalExpression>> args = new ArrayList<>();
             args.add(new MutableObject<ILogicalExpression>(new 
VariableReferenceExpression(opAndVar.second)));
-            UnnestOperator unnestOp = new UnnestOperator(unnestVar,
-                    new MutableObject<ILogicalExpression>(new 
UnnestingFunctionCallExpression(
+            UnnestOperator unnestOp = new UnnestOperator(unnestVar, new 
MutableObject<ILogicalExpression>(
+                    new UnnestingFunctionCallExpression(
                             
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), args)));
             unnestOp.getInputs().add(new 
MutableObject<ILogicalOperator>(opAndVar.first));
             inputOpRefsToUnion.add(new 
MutableObject<ILogicalOperator>(unnestOp));
@@ -1625,8 +1608,8 @@
         while (inputOpRefIterator.hasNext()) {
             // Generates the variable triple <leftVar, rightVar, outputVar> .
             topUnionVar = context.newVar();
-            Triple<LogicalVariable, LogicalVariable, LogicalVariable> 
varTriple =
-                    new Triple<>(leftInputVar, inputVarIterator.next(), 
topUnionVar);
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> 
varTriple = new Triple<>(leftInputVar,
+                    inputVarIterator.next(), topUnionVar);
             List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> 
varTriples = new ArrayList<>();
             varTriples.add(varTriple);
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e251f32..b76858e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -93,6 +93,8 @@
     public static final String KEY_VALUE_FORMAT = "value-format";
     // a boolean indicating whether the feed is a change feed
     public static final String KEY_IS_CHANGE_FEED = "change-feed";
+    // a boolean indicating whether the feed use upsert
+    public static final String KEY_IS_UPSERT_FEED = "upsert-feed";
     // an integer representing the number of keys in a change feed
     public static final String KEY_KEY_SIZE = "key-size";
     // a boolean indicating whether the feed produces records with metadata
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 23cd39c..8eb8815 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -296,6 +296,10 @@
         return 
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
     }
 
+    public static boolean isUpsertFeed(Map<String, String> configuration) {
+        return 
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_UPSERT_FEED));
+    }
+
     public static int getNumberOfKeys(Map<String, String> configuration) 
throws AsterixException {
         String keyIndexes = 
configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
         if (keyIndexes == null) {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 12ab634..17ddb70 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -128,8 +128,8 @@
             MetadataTransactionContext ctx) throws AsterixException {
         FeedPolicyEntity feedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
         if (feedPolicy == null) {
-            feedPolicy =
-                    MetadataManager.INSTANCE.getFeedPolicy(ctx, 
MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
+            feedPolicy = MetadataManager.INSTANCE
+                    .getFeedPolicy(ctx, 
MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
             if (feedPolicy == null) {
                 throw new AsterixException("Unknown feed policy" + policyName);
             }
@@ -170,11 +170,11 @@
                 boolean enableSubscriptionMode;
                 OperatorDescriptorId opId = null;
                 if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
-                    IPushRuntimeFactory[] runtimeFactories =
-                            ((AlgebricksMetaOperatorDescriptor) 
opDesc).getPipeline().getRuntimeFactories();
+                    IPushRuntimeFactory[] runtimeFactories = 
((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
+                            .getRuntimeFactories();
                     if (runtimeFactories[0] instanceof AssignRuntimeFactory && 
runtimeFactories.length > 1) {
-                        IConnectorDescriptor connectorDesc =
-                                
spec.getOperatorInputMap().get(opDesc.getOperatorId()).get(0);
+                        IConnectorDescriptor connectorDesc = 
spec.getOperatorInputMap().get(opDesc.getOperatorId())
+                                .get(0);
                         IOperatorDescriptor sourceOp = 
spec.getProducer(connectorDesc);
                         if (sourceOp instanceof FeedCollectOperatorDescriptor) 
{
                             runtimeType = FeedRuntimeType.COMPUTE;
@@ -209,16 +209,16 @@
         }
 
         // make connections between operators
-        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>>
-        entry : spec.getConnectorOperatorMap().entrySet()) {
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
+                .getConnectorOperatorMap().entrySet()) {
             IConnectorDescriptor connDesc = 
altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
             Pair<IOperatorDescriptor, Integer> leftOp = 
entry.getValue().getLeft();
             Pair<IOperatorDescriptor, Integer> rightOp = 
entry.getValue().getRight();
 
-            IOperatorDescriptor leftOpDesc =
-                    
altered.getOperatorMap().get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
-            IOperatorDescriptor rightOpDesc =
-                    
altered.getOperatorMap().get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
+            IOperatorDescriptor leftOpDesc = altered.getOperatorMap()
+                    .get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
+            IOperatorDescriptor rightOpDesc = altered.getOperatorMap()
+                    .get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
 
             altered.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
         }
@@ -302,8 +302,8 @@
     public static void increaseCardinality(JobSpecification spec, 
FeedRuntimeType compute, int requiredCardinality,
             List<String> newLocations) throws AsterixException {
         IOperatorDescriptor changingOpDesc = 
alterJobSpecForComputeCardinality(spec, requiredCardinality);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
changingOpDesc,
-                nChooseK(requiredCardinality, newLocations));
+        PartitionConstraintHelper
+                .addAbsoluteLocationConstraint(spec, changingOpDesc, 
nChooseK(requiredCardinality, newLocations));
 
     }
 
@@ -314,23 +314,22 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
changingOpDesc, chosenLocations);
     }
 
-    private static IOperatorDescriptor alterJobSpecForComputeCardinality(
-            JobSpecification spec, int requiredCardinality) throws 
AsterixException {
+    private static IOperatorDescriptor 
alterJobSpecForComputeCardinality(JobSpecification spec, int 
requiredCardinality)
+            throws AsterixException {
         Map<ConnectorDescriptorId, IConnectorDescriptor> connectors = 
spec.getConnectorMap();
-        Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, 
Pair<IOperatorDescriptor, Integer>>> 
-        connectorOpMap = spec.getConnectorOperatorMap();
+        Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, 
Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
+                .getConnectorOperatorMap();
         IOperatorDescriptor sourceOp = null;
         IOperatorDescriptor targetOp = null;
         IConnectorDescriptor connDesc = null;
-        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>>
-        entry : connectorOpMap.entrySet()) {
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
+                .entrySet()) {
             ConnectorDescriptorId cid = entry.getKey();
             sourceOp = entry.getValue().getKey().getKey();
             if (sourceOp instanceof FeedCollectOperatorDescriptor) {
                 targetOp = entry.getValue().getValue().getKey();
-                if ((targetOp instanceof FeedMetaOperatorDescriptor)
-                        && (((FeedMetaOperatorDescriptor) 
targetOp).getRuntimeType()
-                                .equals(FeedRuntimeType.COMPUTE))) {
+                if ((targetOp instanceof FeedMetaOperatorDescriptor) && 
(((FeedMetaOperatorDescriptor) targetOp)
+                        .getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
                     connDesc = connectors.get(cid);
                     break;
                 } else {
@@ -425,8 +424,8 @@
         try {
             MetadataManager.INSTANCE.acquireReadLatch();
             ctx = MetadataManager.INSTANCE.beginTransaction();
-            feed = MetadataManager.INSTANCE.getFeed(ctx, 
connectionId.getFeedId().getDataverse(),
-                    connectionId.getFeedId().getEntityName());
+            feed = MetadataManager.INSTANCE
+                    .getFeed(ctx, connectionId.getFeedId().getDataverse(), 
connectionId.getFeedId().getEntityName());
             preProcessingRequired = feed.getAppliedFunction() != null;
             MetadataManager.INSTANCE.commitTransaction(ctx);
         } catch (Exception e) {
@@ -454,8 +453,8 @@
             ExternalDataUtils.prepareFeed(configuration, 
feed.getDataverseName(), feed.getFeedName());
             ExternalDataUtils.prepareFeed(configuration, 
feed.getDataverseName(), feed.getFeedName());
             // Get adapter from metadata dataset <Metadata dataverse>
-            DatasourceAdapter adapterEntity = 
MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
-                    MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+            DatasourceAdapter adapterEntity = MetadataManager.INSTANCE
+                    .getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
             // Get adapter from metadata dataset <The feed dataverse>
             if (adapterEntity == null) {
                 adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
feed.getDataverseName(), adapterName);
@@ -482,14 +481,14 @@
                 adapterFactory.setMetaType(metaType);
                 adapterFactory.configure(null, configuration);
             } else {
-                AdapterFactoryProvider.getAdapterFactory(libraryManager, 
adapterName, configuration, adapterOutputType,
-                        metaType);
+                AdapterFactoryProvider
+                        .getAdapterFactory(libraryManager, adapterName, 
configuration, adapterOutputType, metaType);
             }
             if (metaType == null && 
configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
                 metaType = getOutputType(feed, configuration, 
ExternalDataConstants.KEY_META_TYPE_NAME);
                 if (metaType == null) {
-                    throw new AsterixException("Unknown specified feed meta 
output data type "
-                            + 
configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
+                    throw new AsterixException("Unknown specified feed meta 
output data type " + configuration
+                            .get(ExternalDataConstants.KEY_META_TYPE_NAME));
                 }
             }
             if (adapterOutputType == null) {
@@ -498,8 +497,8 @@
                 }
                 adapterOutputType = getOutputType(feed, configuration, 
ExternalDataConstants.KEY_TYPE_NAME);
                 if (adapterOutputType == null) {
-                    throw new AsterixException("Unknown specified feed output 
data type "
-                            + 
configuration.get(ExternalDataConstants.KEY_TYPE_NAME));
+                    throw new AsterixException("Unknown specified feed output 
data type " + configuration
+                            .get(ExternalDataConstants.KEY_TYPE_NAME));
                 }
             }
         } catch (Exception e) {
@@ -507,10 +506,9 @@
         }
     }
 
-    @SuppressWarnings("rawtypes")
-    public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> 
getPrimaryFeedFactoryAndOutput(Feed feed,
-            FeedPolicyAccessor policyAccessor, MetadataTransactionContext 
mdTxnCtx, ILibraryManager libraryManager)
-            throws AlgebricksException {
+    @SuppressWarnings("rawtypes") public static Triple<IAdapterFactory, 
RecordDescriptor, AdapterType> getPrimaryFeedFactoryAndOutput(
+            Feed feed, FeedPolicyAccessor policyAccessor, 
MetadataTransactionContext mdTxnCtx,
+            ILibraryManager libraryManager) throws AlgebricksException {
         // This method needs to be re-visited
         String adapterName = null;
         DatasourceAdapter adapterEntity = null;
@@ -528,8 +526,8 @@
             metaType = getOutputType(feed, configuration, 
ExternalDataConstants.KEY_META_TYPE_NAME);
             ExternalDataUtils.prepareFeed(configuration, 
feed.getDataverseName(), feed.getFeedName());
             // Get adapter from metadata dataset <Metadata dataverse>
-            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME,
-                    adapterName);
+            adapterEntity = MetadataManager.INSTANCE
+                    .getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
             // Get adapter from metadata dataset <The feed dataverse>
             if (adapterEntity == null) {
                 adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
feed.getDataverseName(), adapterName);
@@ -554,8 +552,8 @@
                 adapterFactory.setMetaType(metaType);
                 adapterFactory.configure(null, configuration);
             } else {
-                adapterFactory = 
AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName, 
configuration,
-                        adapterOutputType, metaType);
+                adapterFactory = AdapterFactoryProvider
+                        .getAdapterFactory(libraryManager, adapterName, 
configuration, adapterOutputType, metaType);
                 adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
             }
             if (metaType == null) {
@@ -592,9 +590,9 @@
         return feedProps;
     }
 
-    @SuppressWarnings("rawtypes")
-    private static void getSerdesForPKs(ISerializerDeserializer[] serdes, 
Map<String, String> configuration,
-            ARecordType metaType, ARecordType adapterOutputType, int index) 
throws AlgebricksException {
+    @SuppressWarnings("rawtypes") private static void 
getSerdesForPKs(ISerializerDeserializer[] serdes,
+            Map<String, String> configuration, ARecordType metaType, 
ARecordType adapterOutputType, int index)
+            throws AlgebricksException {
         int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
         if (metaType != null) {
             int[] pkIndicators = 
ExternalDataUtils.getPKSourceIndicators(configuration);
@@ -694,10 +692,12 @@
         return outputType;
     }
 
-    public static boolean isChangeFeed(AqlMetadataProvider mdProvider, String 
dataverse, String feedName)
-            throws AlgebricksException {
-        Feed feed = mdProvider.findFeed(dataverse, feedName);
+    public static boolean isChangeFeed(Feed feed, String dataverse, String 
feedName) throws AlgebricksException {
         return ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
     }
 
+    public static boolean isUpsertFeed(Feed feed, String dataverse, String 
feedName) throws AlgebricksException {
+        return ExternalDataUtils.isUpsertFeed(feed.getAdapterConfiguration());
+    }
+
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1068
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic5133e7c6941fea4110cc9983f99502f364dc810
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to