Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1068
Change subject: Add upsert option for feed
......................................................................
Add upsert option for feed
For ASTERIXDB-1567. Provide "upsert-feed"="true" for
feed configuration, which changes the default record
insert to upsert.
Change-Id: Ic5133e7c6941fea4110cc9983f99502f364dc810
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
4 files changed, 175 insertions(+), 186 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/1068/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 5081587..8521c01 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -81,6 +81,7 @@
import org.apache.asterix.metadata.declared.ResultSetDataSink;
import org.apache.asterix.metadata.declared.ResultSetSinkId;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
@@ -181,13 +182,11 @@
FormatUtils.getDefaultFormat().registerRuntimeFunctions(FunctionCollection.getFunctionDescriptorFactories());
}
- @Override
- public int getVarCounter() {
+ @Override public int getVarCounter() {
return context.getVarCounter();
}
- @Override
- public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws
AlgebricksException {
+ @Override public ILogicalPlan translateLoad(ICompiledDmlStatement stmt)
throws AlgebricksException {
CompiledLoadFromFileStatement clffs = (CompiledLoadFromFileStatement)
stmt;
Dataset dataset =
metadataProvider.findDataset(clffs.getDataverseName(), clffs.getDatasetName());
if (dataset == null) {
@@ -196,10 +195,10 @@
"Unable to load dataset " + clffs.getDatasetName() + "
since it does not exist");
}
IAType itemType =
metadataProvider.findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
- IAType metaItemType =
-
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName());
- DatasetDataSource targetDatasource =
- validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
stmt.getDatasetName());
+ IAType metaItemType = metadataProvider
+ .findType(dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName());
+ DatasetDataSource targetDatasource =
validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
+ stmt.getDatasetName());
List<List<String>> partitionKeys =
DatasetUtils.getPartitioningKeys(targetDatasource.getDataset());
if (dataset.hasMetaPart()) {
throw new AlgebricksException(
@@ -235,8 +234,8 @@
List<Mutable<ILogicalExpression>> varRefsForLoading = new
ArrayList<>();
LogicalVariable payloadVar = payloadVars.get(0);
for (List<String> keyFieldName : partitionKeys) {
- PlanTranslationUtil.prepareVarAndExpression(keyFieldName,
payloadVar, pkVars, pkExprs, varRefsForLoading,
- context);
+ PlanTranslationUtil
+ .prepareVarAndExpression(keyFieldName, payloadVar, pkVars,
pkExprs, varRefsForLoading, context);
}
AssignOperator assign = new AssignOperator(pkVars, pkExprs);
@@ -262,8 +261,8 @@
additionalFilteringExpressions = new ArrayList<>();
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField,
payloadVar, additionalFilteringVars,
additionalFilteringAssignExpressions,
additionalFilteringExpressions, context);
- additionalFilteringAssign =
- new AssignOperator(additionalFilteringVars,
additionalFilteringAssignExpressions);
+ additionalFilteringAssign = new
AssignOperator(additionalFilteringVars,
+ additionalFilteringAssignExpressions);
}
InsertDeleteUpsertOperator insertOp = new
InsertDeleteUpsertOperator(targetDatasource, payloadRef,
@@ -282,12 +281,10 @@
return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
}
- @SuppressWarnings("unchecked")
- @Override
- public ILogicalPlan translate(Query expr, String outputDatasetName,
ICompiledDmlStatement stmt)
- throws AlgebricksException {
- Pair<ILogicalOperator, LogicalVariable> p =
- expr.accept(this, new MutableObject<>(new
EmptyTupleSourceOperator()));
+ @SuppressWarnings("unchecked") @Override public ILogicalPlan
translate(Query expr, String outputDatasetName,
+ ICompiledDmlStatement stmt) throws AlgebricksException {
+ Pair<ILogicalOperator, LogicalVariable> p = expr
+ .accept(this, new MutableObject<>(new
EmptyTupleSourceOperator()));
ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new
ArrayList<>();
ILogicalOperator topOp = p.first;
ProjectOperator project = (ProjectOperator) topOp;
@@ -321,19 +318,18 @@
*/
LogicalVariable seqVar = context.newVar();
/** This assign adds a marker function collection-to-sequence: if
the input is a singleton collection, unnest it; otherwise do nothing. */
- AssignOperator assignCollectionToSequence = new
AssignOperator(seqVar,
- new MutableObject<>(new ScalarFunctionCallExpression(
+ AssignOperator assignCollectionToSequence = new
AssignOperator(seqVar, new MutableObject<>(
+ new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.COLLECTION_TO_SEQUENCE),
new MutableObject<>(new
VariableReferenceExpression(resVar)))));
assignCollectionToSequence.getInputs().add(new
MutableObject<>(project.getInputs().get(0).getValue()));
project.getInputs().get(0).setValue(assignCollectionToSequence);
project.getVariables().set(0, seqVar);
resVar = seqVar;
- DatasetDataSource targetDatasource =
- validateDatasetInfo(metadataProvider,
stmt.getDataverseName(), stmt.getDatasetName());
- List<Integer> keySourceIndicator =
- ((InternalDatasetDetails)
targetDatasource.getDataset().getDatasetDetails())
- .getKeySourceIndicator();
+ DatasetDataSource targetDatasource =
validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
+ stmt.getDatasetName());
+ List<Integer> keySourceIndicator = ((InternalDatasetDetails)
targetDatasource.getDataset()
+ .getDatasetDetails()).getKeySourceIndicator();
ArrayList<LogicalVariable> vars = new ArrayList<>();
ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
List<Mutable<ILogicalExpression>> varRefsForLoading = new
ArrayList<>();
@@ -342,8 +338,9 @@
for (int i = 0; i < numOfPrimaryKeys; i++) {
if (keySourceIndicator == null ||
keySourceIndicator.get(i).intValue() == 0) {
// record part
-
PlanTranslationUtil.prepareVarAndExpression(partitionKeys.get(i), resVar, vars,
exprs,
- varRefsForLoading, context);
+ PlanTranslationUtil
+ .prepareVarAndExpression(partitionKeys.get(i),
resVar, vars, exprs, varRefsForLoading,
+ context);
} else {
// meta part
PlanTranslationUtil.prepareMetaKeyAccessExpression(partitionKeys.get(i),
unnestVar, exprs, vars,
@@ -365,8 +362,8 @@
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, resVar,
additionalFilteringVars,
additionalFilteringAssignExpressions,
additionalFilteringExpressions, context);
- additionalFilteringAssign =
- new AssignOperator(additionalFilteringVars,
additionalFilteringAssignExpressions);
+ additionalFilteringAssign = new
AssignOperator(additionalFilteringVars,
+ additionalFilteringAssignExpressions);
additionalFilteringAssign.getInputs().add(new
MutableObject<>(project));
assign.getInputs().add(new
MutableObject<>(additionalFilteringAssign));
} else {
@@ -449,8 +446,11 @@
List<LogicalVariable> metaAndKeysVars = null;
List<Mutable<ILogicalExpression>> metaAndKeysExprs = null;
List<Mutable<ILogicalExpression>> metaExpSingletonList = null;
- boolean isChangeFeed =
- FeedMetadataUtil.isChangeFeed(metadataProvider,
sfs.getDataverseName(), sfs.getFeedName());
+ Feed feed = metadataProvider.findFeed(sfs.getDataverseName(),
sfs.getFeedName());
+
+ boolean isChangeFeed = FeedMetadataUtil.isChangeFeed(feed,
sfs.getDataverseName(), sfs.getFeedName());
+ boolean isUpsertFeed = FeedMetadataUtil.isUpsertFeed(feed,
sfs.getDataverseName(), sfs.getFeedName());
+
if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
metaAndKeysVars = new ArrayList<>();
metaAndKeysExprs = new ArrayList<>();
@@ -507,8 +507,13 @@
feedModificationOp.getInputs().add(assign.getInputs().get(0));
}
} else {
- feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
- metaExpSingletonList,
InsertDeleteUpsertOperator.Kind.INSERT, false);
+ if (isUpsertFeed) {
+ feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+ metaExpSingletonList,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
+ } else {
+ feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+ metaExpSingletonList,
InsertDeleteUpsertOperator.Kind.INSERT, false);
+ }
feedModificationOp.getInputs().add(new MutableObject<>(assign));
}
if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
@@ -575,8 +580,8 @@
}
AqlSourceId sourceId = new AqlSourceId(dataverseName, datasetName);
IAType itemType =
metadataProvider.findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
- IAType metaItemType =
-
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName());
+ IAType metaItemType = metadataProvider
+ .findType(dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName());
INodeDomain domain =
metadataProvider.findNodeDomain(dataset.getNodeGroupName());
return new DatasetDataSource(sourceId, dataset, itemType,
metaItemType, AqlDataSourceType.INTERNAL_DATASET,
dataset.getDatasetDetails(), domain);
@@ -590,8 +595,7 @@
return new FileSplit(metadataProperties.getMetadataNodeName(), new
FileReference(new File(filePath)));
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(LetClause lc,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(LetClause
lc, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
LogicalVariable v;
ILogicalOperator returnedOp;
@@ -602,33 +606,31 @@
returnedOp.getInputs().add(tupSource);
} else {
v = context.newVar(lc.getVarExpr());
- Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo =
- langExprToAlgExpression(lc.getBindingExpr(), tupSource);
+ Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo =
langExprToAlgExpression(lc.getBindingExpr(),
+ tupSource);
returnedOp = new AssignOperator(v, new MutableObject<>(eo.first));
returnedOp.getInputs().add(eo.second);
}
return new Pair<>(returnedOp, v);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(FieldAccessor fa,
Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(FieldAccessor fa,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> p =
langExprToAlgExpression(fa.getExpr(), tupSource);
LogicalVariable v = context.newVar();
AbstractFunctionCallExpression fldAccess = new
ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME));
fldAccess.getArguments().add(new MutableObject<>(p.first));
- ILogicalExpression faExpr =
- new ConstantExpression(new AsterixConstantValue(new
AString(fa.getIdent().getValue())));
+ ILogicalExpression faExpr = new ConstantExpression(
+ new AsterixConstantValue(new
AString(fa.getIdent().getValue())));
fldAccess.getArguments().add(new MutableObject<>(faExpr));
AssignOperator a = new AssignOperator(v, new
MutableObject<>(fldAccess));
a.getInputs().add(p.second);
return new Pair<>(a, v);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(IndexAccessor ia,
Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(IndexAccessor ia,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> p =
langExprToAlgExpression(ia.getExpr(), tupSource);
LogicalVariable v = context.newVar();
AbstractFunctionCallExpression f;
@@ -637,8 +639,8 @@
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.ANY_COLLECTION_MEMBER));
f.getArguments().add(new MutableObject<>(p.first));
} else {
- Pair<ILogicalExpression, Mutable<ILogicalOperator>> indexPair =
- langExprToAlgExpression(ia.getIndexExpr(), tupSource);
+ Pair<ILogicalExpression, Mutable<ILogicalOperator>> indexPair =
langExprToAlgExpression(ia.getIndexExpr(),
+ tupSource);
f = new
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM));
f.getArguments().add(new MutableObject<>(p.first));
f.getArguments().add(new MutableObject<>(indexPair.first));
@@ -648,8 +650,7 @@
return new Pair<>(a, v);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(CallExpr fcall,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(CallExpr
fcall, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
LogicalVariable v = context.newVar();
FunctionSignature signature = fcall.getFunctionSignature();
@@ -745,8 +746,8 @@
if (AsterixBuiltinFunctions.isBuiltinAggregateFunction(fi)) {
f = AsterixBuiltinFunctions.makeAggregateFunctionExpression(fi,
args);
} else if (AsterixBuiltinFunctions.isBuiltinUnnestingFunction(fi)) {
- UnnestingFunctionCallExpression ufce =
- new
UnnestingFunctionCallExpression(FunctionUtil.getFunctionInfo(fi), args);
+ UnnestingFunctionCallExpression ufce = new
UnnestingFunctionCallExpression(FunctionUtil.getFunctionInfo(fi),
+ args);
ufce.setReturnsUniqueValues(AsterixBuiltinFunctions.returnsUniqueValues(fi));
f = ufce;
} else {
@@ -755,30 +756,27 @@
return f;
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(FunctionDecl fd,
Mutable<ILogicalOperator> tupSource) {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(FunctionDecl fd,
+ Mutable<ILogicalOperator> tupSource) {
throw new IllegalStateException("Function declarations should be
inlined at AST rewriting phase.");
}
- @SuppressWarnings("unchecked")
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(GroupbyClause gc,
Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
+ @SuppressWarnings("unchecked") @Override public Pair<ILogicalOperator,
LogicalVariable> visit(GroupbyClause gc,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
Mutable<ILogicalOperator> topOp = tupSource;
if (gc.hasGroupVar()) {
List<Pair<Expression, Identifier>> groupFieldList =
gc.getGroupFieldList();
List<Mutable<ILogicalExpression>> groupRecordConstructorArgList =
new ArrayList<>();
for (Pair<Expression, Identifier> groupField : groupFieldList) {
- ILogicalExpression groupFieldNameExpr =
- langExprToAlgExpression(new LiteralExpr(new
StringLiteral(groupField.second.getValue())),
- topOp).first;
+ ILogicalExpression groupFieldNameExpr =
langExprToAlgExpression(
+ new LiteralExpr(new
StringLiteral(groupField.second.getValue())), topOp).first;
groupRecordConstructorArgList.add(new
MutableObject<>(groupFieldNameExpr));
ILogicalExpression groupFieldExpr =
langExprToAlgExpression(groupField.first, topOp).first;
groupRecordConstructorArgList.add(new
MutableObject<>(groupFieldExpr));
}
LogicalVariable groupVar = context.newVar(gc.getGroupVar());
- AssignOperator groupVarAssignOp = new AssignOperator(groupVar,
- new MutableObject<>(new ScalarFunctionCallExpression(
+ AssignOperator groupVarAssignOp = new AssignOperator(groupVar, new
MutableObject<>(
+ new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
groupRecordConstructorArgList)));
groupVarAssignOp.getInputs().add(topOp);
@@ -854,8 +852,7 @@
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(IfExpr ifexpr,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(IfExpr
ifexpr, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
// In the most general case, IfThenElse is translated in the following
// way.
@@ -894,22 +891,20 @@
// Unnests the selected ("if" or "else") result.
LogicalVariable unnestVar = context.newVar();
- UnnestOperator unnestOp = new UnnestOperator(unnestVar,
- new MutableObject<>(new UnnestingFunctionCallExpression(
-
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
Collections
- .singletonList(new MutableObject<>(new
VariableReferenceExpression(selectVar))))));
+ UnnestOperator unnestOp = new UnnestOperator(unnestVar, new
MutableObject<>(new UnnestingFunctionCallExpression(
+
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
+ Collections.singletonList(new MutableObject<>(new
VariableReferenceExpression(selectVar))))));
unnestOp.getInputs().add(new MutableObject<>(assignOp));
// Produces the final result.
LogicalVariable resultVar = context.newVar();
- AssignOperator finalAssignOp =
- new AssignOperator(resultVar, new MutableObject<>(new
VariableReferenceExpression(unnestVar)));
+ AssignOperator finalAssignOp = new AssignOperator(resultVar,
+ new MutableObject<>(new
VariableReferenceExpression(unnestVar)));
finalAssignOp.getInputs().add(new MutableObject<>(unnestOp));
return new Pair<>(finalAssignOp, resultVar);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(LiteralExpr l,
Mutable<ILogicalOperator> tupSource) {
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(LiteralExpr
l, Mutable<ILogicalOperator> tupSource) {
LogicalVariable var = context.newVar();
AssignOperator a = new AssignOperator(var, new MutableObject<>(
new ConstantExpression(new
AsterixConstantValue(ConstantHelper.objectFromLiteral(l.getValue())))));
@@ -919,8 +914,7 @@
return new Pair<>(a, var);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(OperatorExpr op,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(OperatorExpr op, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
List<OperatorType> ops = op.getOpList();
int nOps = ops.size();
@@ -1001,9 +995,8 @@
return new Pair<>(a, assignedVar);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(OrderbyClause oc,
Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(OrderbyClause oc,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
OrderOperator ord = new OrderOperator();
Iterator<OrderModifier> modifIter = oc.getModifierList().iterator();
Mutable<ILogicalOperator> topOp = tupSource;
@@ -1030,9 +1023,8 @@
return new Pair<>(ord, null);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(QuantifiedExpression
qe, Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(QuantifiedExpression qe,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
Mutable<ILogicalOperator> topOp = tupSource;
ILogicalOperator firstOp = null;
@@ -1065,16 +1057,17 @@
if (qe.getQuantifier() == Quantifier.SOME) {
s = new SelectOperator(new MutableObject<>(eo2.first), false,
null);
s.getInputs().add(eo2.second);
- fAgg =
AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.NON_EMPTY_STREAM,
- new ArrayList<>());
+ fAgg = AsterixBuiltinFunctions
+
.makeAggregateFunctionExpression(AsterixBuiltinFunctions.NON_EMPTY_STREAM, new
ArrayList<>());
} else { // EVERY
List<Mutable<ILogicalExpression>> satExprList = new ArrayList<>(1);
satExprList.add(new MutableObject<>(eo2.first));
- s = new SelectOperator(new MutableObject<>(new
ScalarFunctionCallExpression(
-
FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT), satExprList)),
false, null);
+ s = new SelectOperator(new MutableObject<>(
+ new
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.NOT),
+ satExprList)), false, null);
s.getInputs().add(eo2.second);
- fAgg =
AsterixBuiltinFunctions.makeAggregateFunctionExpression(AsterixBuiltinFunctions.EMPTY_STREAM,
- new ArrayList<>());
+ fAgg = AsterixBuiltinFunctions
+
.makeAggregateFunctionExpression(AsterixBuiltinFunctions.EMPTY_STREAM, new
ArrayList<>());
}
LogicalVariable qeVar = context.newVar();
AggregateOperator a = new
AggregateOperator(mkSingletonArrayList(qeVar),
@@ -1083,15 +1076,13 @@
return new Pair<>(a, qeVar);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(Query q,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(Query q,
Mutable<ILogicalOperator> tupSource)
throws AsterixException {
return q.getBody().accept(this, tupSource);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(RecordConstructor rc,
Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(RecordConstructor rc,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
AbstractFunctionCallExpression f = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR));
LogicalVariable v1 = context.newVar();
@@ -1109,11 +1100,11 @@
return new Pair<>(a, v1);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(ListConstructor lc,
Mutable<ILogicalOperator> tupSource)
- throws AsterixException {
- FunctionIdentifier fid = (lc.getType() ==
Type.ORDERED_LIST_CONSTRUCTOR)
- ? AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR :
AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR;
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(ListConstructor lc,
+ Mutable<ILogicalOperator> tupSource) throws AsterixException {
+ FunctionIdentifier fid = (lc.getType() ==
Type.ORDERED_LIST_CONSTRUCTOR) ?
+ AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR :
+ AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR;
AbstractFunctionCallExpression f = new
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fid));
LogicalVariable v1 = context.newVar();
AssignOperator a = new AssignOperator(v1, new MutableObject<>(f));
@@ -1127,8 +1118,7 @@
return new Pair<>(a, v1);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(UnaryExpr u,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(UnaryExpr
u, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
Expression expr = u.getExpr();
Pair<ILogicalExpression, Mutable<ILogicalOperator>> eo =
langExprToAlgExpression(expr, tupSource);
@@ -1157,8 +1147,8 @@
return new Pair<>(a, v1);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(VariableExpr v,
Mutable<ILogicalOperator> tupSource) {
+ @Override public Pair<ILogicalOperator, LogicalVariable>
visit(VariableExpr v,
+ Mutable<ILogicalOperator> tupSource) {
// Should we ever get to this method?
LogicalVariable var = context.newVar();
LogicalVariable oldV = context.getVar(v.getVar().getId());
@@ -1167,8 +1157,7 @@
return new Pair<>(a, var);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(WhereClause w,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(WhereClause
w, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> p =
langExprToAlgExpression(w.getWhereExpr(), tupSource);
SelectOperator s = new SelectOperator(new MutableObject<>(p.first),
false, null);
@@ -1176,8 +1165,7 @@
return new Pair<>(s, null);
}
- @Override
- public Pair<ILogicalOperator, LogicalVariable> visit(LimitClause lc,
Mutable<ILogicalOperator> tupSource)
+ @Override public Pair<ILogicalOperator, LogicalVariable> visit(LimitClause
lc, Mutable<ILogicalOperator> tupSource)
throws AsterixException {
Pair<ILogicalExpression, Mutable<ILogicalOperator>> p1 =
langExprToAlgExpression(lc.getLimitExpr(), tupSource);
LimitOperator opLim;
@@ -1277,8 +1265,8 @@
Mutable<ILogicalOperator> topOpRef) throws AsterixException {
switch (expr.getKind()) {
case VARIABLE_EXPRESSION:
- VariableReferenceExpression ve =
- new
VariableReferenceExpression(context.getVar(((VariableExpr)
expr).getVar().getId()));
+ VariableReferenceExpression ve = new
VariableReferenceExpression(
+ context.getVar(((VariableExpr)
expr).getVar().getId()));
return new Pair<>(ve, topOpRef);
case LITERAL_EXPRESSION:
LiteralExpr val = (LiteralExpr) expr;
@@ -1393,8 +1381,9 @@
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
case FUNCTION_CALL:
AbstractFunctionCallExpression fce =
(AbstractFunctionCallExpression) expr;
- return (fce.getKind() == FunctionKind.UNNEST) ? expr
- : new UnnestingFunctionCallExpression(
+ return (fce.getKind() == FunctionKind.UNNEST) ?
+ expr :
+ new UnnestingFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), argRefs);
default:
return expr;
@@ -1422,8 +1411,7 @@
* Eliminate shared operator references in a query plan.
* Deep copy a new query plan subtree whenever there is a shared operator
reference.
*
- * @param plan,
- * the query plan.
+ * @param plan, the query plan.
* @throws AsterixException
*/
private void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan)
throws AsterixException {
@@ -1437,12 +1425,10 @@
* Eliminate shared operator references in a query plan rooted at
<code>currentOpRef.getValue()</code>.
* Deep copy a new query plan subtree whenever there is a shared operator
reference.
*
- * @param currentOpRef,
- * the operator reference to consider
- * @param opRefSet,
- * the set storing seen operator references so far.
+ * @param currentOpRef, the operator reference to consider
+ * @param opRefSet, the set storing seen operator references so far.
* @return a mapping that maps old variables to new variables, for the
ancestors of
- * <code>currentOpRef</code> to replace variables properly.
+ * <code>currentOpRef</code> to replace variables properly.
* @throws AsterixException
*/
private Map<LogicalVariable, LogicalVariable>
eliminateSharedOperatorReference(
@@ -1471,8 +1457,8 @@
if (opRefSet.contains(childRef)) {
// There is a shared operator reference in the query plan.
// Deep copies the child plan.
- LogicalOperatorDeepCopyWithNewVariablesVisitor visitor =
- new
LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
+ LogicalOperatorDeepCopyWithNewVariablesVisitor visitor =
new LogicalOperatorDeepCopyWithNewVariablesVisitor(
+ context, null);
ILogicalOperator newChild =
childRef.getValue().accept(visitor, null);
Map<LogicalVariable, LogicalVariable> cloneVarMap =
visitor.getInputToOutputVariableMapping();
@@ -1487,8 +1473,8 @@
// Recursively eliminate shared operator reference for the
operator subtree,
// even if it is a deep copy of some other one.
- Map<LogicalVariable, LogicalVariable> childVarMap =
- eliminateSharedOperatorReference(childRef, opRefSet);
+ Map<LogicalVariable, LogicalVariable> childVarMap =
eliminateSharedOperatorReference(childRef,
+ opRefSet);
// Substitute variables according to the new subtree.
VariableUtilities.substituteVariables(currentOperator,
childVarMap, null);
@@ -1517,12 +1503,9 @@
/**
* Constructs a subplan operator for a branch in a if-else (or case)
expression.
*
- * @param inputOp,
- * the input operator.
- * @param selectExpr,
- * the expression to select tuples that are processed by this
branch.
- * @param branchExpression,
- * the expression to be evaluated in this branch.
+ * @param inputOp, the input operator.
+ * @param selectExpr, the expression to select tuples that are
processed by this branch.
+ * @param branchExpression, the expression to be evaluated in this branch.
* @return a pair of the constructed subplan operator and the output
variable for the branch.
* @throws AsterixException
*/
@@ -1531,18 +1514,18 @@
context.enterSubplan();
SubplanOperator subplanOp = new SubplanOperator();
subplanOp.getInputs().add(new MutableObject<>(inputOp));
- Mutable<ILogicalOperator> nestedSource =
- new MutableObject<>(new NestedTupleSourceOperator(new
MutableObject<>(subplanOp)));
+ Mutable<ILogicalOperator> nestedSource = new MutableObject<>(
+ new NestedTupleSourceOperator(new MutableObject<>(subplanOp)));
SelectOperator select = new SelectOperator(selectExpr, false, null);
// The select operator cannot be moved up and down, otherwise it will
cause typing issues (ASTERIXDB-1203).
OperatorPropertiesUtil.markMovable(select, false);
select.getInputs().add(nestedSource);
Pair<ILogicalOperator, LogicalVariable> pBranch =
branchExpression.accept(this, new MutableObject<>(select));
LogicalVariable branchVar = context.newVar();
- AggregateOperator aggOp = new
AggregateOperator(Collections.singletonList(branchVar),
- Collections.singletonList(new MutableObject<>(new
AggregateFunctionCallExpression(
-
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false,
Collections.singletonList(
- new MutableObject<>(new
VariableReferenceExpression(pBranch.second)))))));
+ AggregateOperator aggOp = new
AggregateOperator(Collections.singletonList(branchVar),
Collections.singletonList(
+ new MutableObject<>(new AggregateFunctionCallExpression(
+
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false,
Collections
+ .singletonList(new MutableObject<>(new
VariableReferenceExpression(pBranch.second)))))));
aggOp.getInputs().add(new MutableObject<>(pBranch.first));
ILogicalPlan planForBranch = new ALogicalPlanImpl(new
MutableObject<>(aggOp));
subplanOp.getNestedPlans().add(planForBranch);
@@ -1552,8 +1535,8 @@
// Processes EXISTS and NOT EXISTS.
private AssignOperator processExists(ILogicalExpression inputExpr,
LogicalVariable v1, boolean not) {
- AbstractFunctionCallExpression count =
- new
ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCALAR_COUNT));
+ AbstractFunctionCallExpression count = new
ScalarFunctionCallExpression(
+
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCALAR_COUNT));
count.getArguments().add(new MutableObject<>(inputExpr));
AbstractFunctionCallExpression comparison = new
ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(not ? AsterixBuiltinFunctions.EQ
: AsterixBuiltinFunctions.NEQ));
@@ -1606,8 +1589,8 @@
LogicalVariable unnestVar = context.newVar();
List<Mutable<ILogicalExpression>> args = new ArrayList<>();
args.add(new MutableObject<ILogicalExpression>(new
VariableReferenceExpression(opAndVar.second)));
- UnnestOperator unnestOp = new UnnestOperator(unnestVar,
- new MutableObject<ILogicalExpression>(new
UnnestingFunctionCallExpression(
+ UnnestOperator unnestOp = new UnnestOperator(unnestVar, new
MutableObject<ILogicalExpression>(
+ new UnnestingFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), args)));
unnestOp.getInputs().add(new
MutableObject<ILogicalOperator>(opAndVar.first));
inputOpRefsToUnion.add(new
MutableObject<ILogicalOperator>(unnestOp));
@@ -1625,8 +1608,8 @@
while (inputOpRefIterator.hasNext()) {
// Generates the variable triple <leftVar, rightVar, outputVar> .
topUnionVar = context.newVar();
- Triple<LogicalVariable, LogicalVariable, LogicalVariable>
varTriple =
- new Triple<>(leftInputVar, inputVarIterator.next(),
topUnionVar);
+ Triple<LogicalVariable, LogicalVariable, LogicalVariable>
varTriple = new Triple<>(leftInputVar,
+ inputVarIterator.next(), topUnionVar);
List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>
varTriples = new ArrayList<>();
varTriples.add(varTriple);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e251f32..b76858e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -93,6 +93,8 @@
public static final String KEY_VALUE_FORMAT = "value-format";
// a boolean indicating whether the feed is a change feed
public static final String KEY_IS_CHANGE_FEED = "change-feed";
+ // a boolean indicating whether the feed use upsert
+ public static final String KEY_IS_UPSERT_FEED = "upsert-feed";
// an integer representing the number of keys in a change feed
public static final String KEY_KEY_SIZE = "key-size";
// a boolean indicating whether the feed produces records with metadata
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 23cd39c..8eb8815 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -296,6 +296,10 @@
return
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED));
}
+ public static boolean isUpsertFeed(Map<String, String> configuration) {
+ return
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_UPSERT_FEED));
+ }
+
public static int getNumberOfKeys(Map<String, String> configuration)
throws AsterixException {
String keyIndexes =
configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
if (keyIndexes == null) {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 12ab634..17ddb70 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -128,8 +128,8 @@
MetadataTransactionContext ctx) throws AsterixException {
FeedPolicyEntity feedPolicy =
MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
if (feedPolicy == null) {
- feedPolicy =
- MetadataManager.INSTANCE.getFeedPolicy(ctx,
MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
+ feedPolicy = MetadataManager.INSTANCE
+ .getFeedPolicy(ctx,
MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
if (feedPolicy == null) {
throw new AsterixException("Unknown feed policy" + policyName);
}
@@ -170,11 +170,11 @@
boolean enableSubscriptionMode;
OperatorDescriptorId opId = null;
if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
- IPushRuntimeFactory[] runtimeFactories =
- ((AlgebricksMetaOperatorDescriptor)
opDesc).getPipeline().getRuntimeFactories();
+ IPushRuntimeFactory[] runtimeFactories =
((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
+ .getRuntimeFactories();
if (runtimeFactories[0] instanceof AssignRuntimeFactory &&
runtimeFactories.length > 1) {
- IConnectorDescriptor connectorDesc =
-
spec.getOperatorInputMap().get(opDesc.getOperatorId()).get(0);
+ IConnectorDescriptor connectorDesc =
spec.getOperatorInputMap().get(opDesc.getOperatorId())
+ .get(0);
IOperatorDescriptor sourceOp =
spec.getProducer(connectorDesc);
if (sourceOp instanceof FeedCollectOperatorDescriptor)
{
runtimeType = FeedRuntimeType.COMPUTE;
@@ -209,16 +209,16 @@
}
// make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>>
- entry : spec.getConnectorOperatorMap().entrySet()) {
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
+ .getConnectorOperatorMap().entrySet()) {
IConnectorDescriptor connDesc =
altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
Pair<IOperatorDescriptor, Integer> rightOp =
entry.getValue().getRight();
- IOperatorDescriptor leftOpDesc =
-
altered.getOperatorMap().get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
- IOperatorDescriptor rightOpDesc =
-
altered.getOperatorMap().get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
+ IOperatorDescriptor leftOpDesc = altered.getOperatorMap()
+ .get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
+ IOperatorDescriptor rightOpDesc = altered.getOperatorMap()
+ .get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
altered.connect(connDesc, leftOpDesc, leftOp.getRight(),
rightOpDesc, rightOp.getRight());
}
@@ -302,8 +302,8 @@
public static void increaseCardinality(JobSpecification spec,
FeedRuntimeType compute, int requiredCardinality,
List<String> newLocations) throws AsterixException {
IOperatorDescriptor changingOpDesc =
alterJobSpecForComputeCardinality(spec, requiredCardinality);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
changingOpDesc,
- nChooseK(requiredCardinality, newLocations));
+ PartitionConstraintHelper
+ .addAbsoluteLocationConstraint(spec, changingOpDesc,
nChooseK(requiredCardinality, newLocations));
}
@@ -314,23 +314,22 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
changingOpDesc, chosenLocations);
}
- private static IOperatorDescriptor alterJobSpecForComputeCardinality(
- JobSpecification spec, int requiredCardinality) throws
AsterixException {
+ private static IOperatorDescriptor
alterJobSpecForComputeCardinality(JobSpecification spec, int
requiredCardinality)
+ throws AsterixException {
Map<ConnectorDescriptorId, IConnectorDescriptor> connectors =
spec.getConnectorMap();
- Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
Pair<IOperatorDescriptor, Integer>>>
- connectorOpMap = spec.getConnectorOperatorMap();
+ Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
+ .getConnectorOperatorMap();
IOperatorDescriptor sourceOp = null;
IOperatorDescriptor targetOp = null;
IConnectorDescriptor connDesc = null;
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>>
- entry : connectorOpMap.entrySet()) {
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
+ .entrySet()) {
ConnectorDescriptorId cid = entry.getKey();
sourceOp = entry.getValue().getKey().getKey();
if (sourceOp instanceof FeedCollectOperatorDescriptor) {
targetOp = entry.getValue().getValue().getKey();
- if ((targetOp instanceof FeedMetaOperatorDescriptor)
- && (((FeedMetaOperatorDescriptor)
targetOp).getRuntimeType()
- .equals(FeedRuntimeType.COMPUTE))) {
+ if ((targetOp instanceof FeedMetaOperatorDescriptor) &&
(((FeedMetaOperatorDescriptor) targetOp)
+ .getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
connDesc = connectors.get(cid);
break;
} else {
@@ -425,8 +424,8 @@
try {
MetadataManager.INSTANCE.acquireReadLatch();
ctx = MetadataManager.INSTANCE.beginTransaction();
- feed = MetadataManager.INSTANCE.getFeed(ctx,
connectionId.getFeedId().getDataverse(),
- connectionId.getFeedId().getEntityName());
+ feed = MetadataManager.INSTANCE
+ .getFeed(ctx, connectionId.getFeedId().getDataverse(),
connectionId.getFeedId().getEntityName());
preProcessingRequired = feed.getAppliedFunction() != null;
MetadataManager.INSTANCE.commitTransaction(ctx);
} catch (Exception e) {
@@ -454,8 +453,8 @@
ExternalDataUtils.prepareFeed(configuration,
feed.getDataverseName(), feed.getFeedName());
ExternalDataUtils.prepareFeed(configuration,
feed.getDataverseName(), feed.getFeedName());
// Get adapter from metadata dataset <Metadata dataverse>
- DatasourceAdapter adapterEntity =
MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
- MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+ DatasourceAdapter adapterEntity = MetadataManager.INSTANCE
+ .getAdapter(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
// Get adapter from metadata dataset <The feed dataverse>
if (adapterEntity == null) {
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
feed.getDataverseName(), adapterName);
@@ -482,14 +481,14 @@
adapterFactory.setMetaType(metaType);
adapterFactory.configure(null, configuration);
} else {
- AdapterFactoryProvider.getAdapterFactory(libraryManager,
adapterName, configuration, adapterOutputType,
- metaType);
+ AdapterFactoryProvider
+ .getAdapterFactory(libraryManager, adapterName,
configuration, adapterOutputType, metaType);
}
if (metaType == null &&
configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
metaType = getOutputType(feed, configuration,
ExternalDataConstants.KEY_META_TYPE_NAME);
if (metaType == null) {
- throw new AsterixException("Unknown specified feed meta
output data type "
- +
configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
+ throw new AsterixException("Unknown specified feed meta
output data type " + configuration
+ .get(ExternalDataConstants.KEY_META_TYPE_NAME));
}
}
if (adapterOutputType == null) {
@@ -498,8 +497,8 @@
}
adapterOutputType = getOutputType(feed, configuration,
ExternalDataConstants.KEY_TYPE_NAME);
if (adapterOutputType == null) {
- throw new AsterixException("Unknown specified feed output
data type "
- +
configuration.get(ExternalDataConstants.KEY_TYPE_NAME));
+ throw new AsterixException("Unknown specified feed output
data type " + configuration
+ .get(ExternalDataConstants.KEY_TYPE_NAME));
}
}
} catch (Exception e) {
@@ -507,10 +506,9 @@
}
}
- @SuppressWarnings("rawtypes")
- public static Triple<IAdapterFactory, RecordDescriptor, AdapterType>
getPrimaryFeedFactoryAndOutput(Feed feed,
- FeedPolicyAccessor policyAccessor, MetadataTransactionContext
mdTxnCtx, ILibraryManager libraryManager)
- throws AlgebricksException {
+ @SuppressWarnings("rawtypes") public static Triple<IAdapterFactory,
RecordDescriptor, AdapterType> getPrimaryFeedFactoryAndOutput(
+ Feed feed, FeedPolicyAccessor policyAccessor,
MetadataTransactionContext mdTxnCtx,
+ ILibraryManager libraryManager) throws AlgebricksException {
// This method needs to be re-visited
String adapterName = null;
DatasourceAdapter adapterEntity = null;
@@ -528,8 +526,8 @@
metaType = getOutputType(feed, configuration,
ExternalDataConstants.KEY_META_TYPE_NAME);
ExternalDataUtils.prepareFeed(configuration,
feed.getDataverseName(), feed.getFeedName());
// Get adapter from metadata dataset <Metadata dataverse>
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
+ adapterEntity = MetadataManager.INSTANCE
+ .getAdapter(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
// Get adapter from metadata dataset <The feed dataverse>
if (adapterEntity == null) {
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
feed.getDataverseName(), adapterName);
@@ -554,8 +552,8 @@
adapterFactory.setMetaType(metaType);
adapterFactory.configure(null, configuration);
} else {
- adapterFactory =
AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
configuration,
- adapterOutputType, metaType);
+ adapterFactory = AdapterFactoryProvider
+ .getAdapterFactory(libraryManager, adapterName,
configuration, adapterOutputType, metaType);
adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
}
if (metaType == null) {
@@ -592,9 +590,9 @@
return feedProps;
}
- @SuppressWarnings("rawtypes")
- private static void getSerdesForPKs(ISerializerDeserializer[] serdes,
Map<String, String> configuration,
- ARecordType metaType, ARecordType adapterOutputType, int index)
throws AlgebricksException {
+ @SuppressWarnings("rawtypes") private static void
getSerdesForPKs(ISerializerDeserializer[] serdes,
+ Map<String, String> configuration, ARecordType metaType,
ARecordType adapterOutputType, int index)
+ throws AlgebricksException {
int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
if (metaType != null) {
int[] pkIndicators =
ExternalDataUtils.getPKSourceIndicators(configuration);
@@ -694,10 +692,12 @@
return outputType;
}
- public static boolean isChangeFeed(AqlMetadataProvider mdProvider, String
dataverse, String feedName)
- throws AlgebricksException {
- Feed feed = mdProvider.findFeed(dataverse, feedName);
+ public static boolean isChangeFeed(Feed feed, String dataverse, String
feedName) throws AlgebricksException {
return ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
}
+ public static boolean isUpsertFeed(Feed feed, String dataverse, String
feedName) throws AlgebricksException {
+ return ExternalDataUtils.isUpsertFeed(feed.getAdapterConfiguration());
+ }
+
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1068
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic5133e7c6941fea4110cc9983f99502f364dc810
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>