Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2059
Change subject: [WIP] Refactoring for Feed pipeline
......................................................................
[WIP] Refactoring for Feed pipeline
Change-Id: I0ae5a837613780a4d2c90c98139fdc6d5e040cc9
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
D
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
D
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
R
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.aql
R
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.2.update.aql
R
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
R
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.3.server.sqlpp
R
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
R
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
C
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
R
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-function/connect-feed-with-function.1.adm
M
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M
asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
M
asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
40 files changed, 336 insertions(+), 417 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/59/2059/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 403c26b..068aa29 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -320,56 +320,6 @@
}
}
- public static class CompiledConnectFeedStatement implements
ICompiledDmlStatement {
- private final String dataverseName;
- private final String feedName;
- private final String datasetName;
- private final String policyName;
- private final Query query;
- private final int varCounter;
-
- public CompiledConnectFeedStatement(String dataverseName, String
feedName, String datasetName,
- String policyName, Query query, int varCounter) {
- this.dataverseName = dataverseName;
- this.feedName = feedName;
- this.datasetName = datasetName;
- this.policyName = policyName;
- this.query = query;
- this.varCounter = varCounter;
- }
-
- @Override
- public String getDataverseName() {
- return dataverseName;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- @Override
- public String getDatasetName() {
- return datasetName;
- }
-
- public int getVarCounter() {
- return varCounter;
- }
-
- public Query getQuery() {
- return query;
- }
-
- @Override
- public byte getKind() {
- return Statement.Kind.CONNECT_FEED;
- }
-
- public String getPolicyName() {
- return policyName;
- }
- }
-
public static class CompiledSubscribeFeedStatement implements
ICompiledDmlStatement {
private FeedConnectionRequest request;
@@ -402,38 +352,6 @@
public byte getKind() {
return Statement.Kind.SUBSCRIBE_FEED;
}
- }
-
- public static class CompiledDisconnectFeedStatement implements
ICompiledDmlStatement {
- private final String dataverseName;
- private final String datasetName;
- private final String feedName;
-
- public CompiledDisconnectFeedStatement(String dataverseName, String
feedName, String datasetName) {
- this.dataverseName = dataverseName;
- this.feedName = feedName;
- this.datasetName = datasetName;
- }
-
- @Override
- public String getDataverseName() {
- return dataverseName;
- }
-
- @Override
- public String getDatasetName() {
- return datasetName;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- @Override
- public byte getKind() {
- return Statement.Kind.DISCONNECT_FEED;
- }
-
}
public static class CompiledDeleteStatement implements
ICompiledDmlStatement {
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index a1c5cf4..794bf9b 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -409,15 +409,6 @@
leafOperator = translateDelete(targetDatasource, varRef,
varRefsForLoading,
additionalFilteringExpressions, assign);
break;
- case Statement.Kind.CONNECT_FEED:
- leafOperator = translateConnectFeed(targetDatasource,
varRef, varRefsForLoading,
- additionalFilteringExpressions, assign);
- break;
- case Statement.Kind.SUBSCRIBE_FEED:
- leafOperator =
translateSubscribeFeed((CompiledSubscribeFeedStatement) stmt, targetDatasource,
- unnestVar, topOp, exprs, resVar,
varRefsForLoading, varRef, assign,
- additionalFilteringField,
additionalFilteringAssign, additionalFilteringExpressions);
- break;
default:
throw new AlgebricksException("Unsupported statement kind
" + stmt.getKind());
}
@@ -427,18 +418,6 @@
ILogicalPlan plan = new ALogicalPlanImpl(globalPlanRoots);
eliminateSharedOperatorReferenceForPlan(plan);
return plan;
- }
-
- private ILogicalOperator translateConnectFeed(DatasetDataSource
targetDatasource,
- Mutable<ILogicalExpression> varRef,
List<Mutable<ILogicalExpression>> varRefsForLoading,
- List<Mutable<ILogicalExpression>> additionalFilteringExpressions,
ILogicalOperator assign) {
- InsertDeleteUpsertOperator insertOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef,
- varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT,
false);
-
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- insertOp.getInputs().add(new MutableObject<>(assign));
- ILogicalOperator leafOperator = new DelegateOperator(new
CommitOperator(true));
- leafOperator.getInputs().add(new MutableObject<>(insertOp));
- return leafOperator;
}
private ILogicalOperator translateDelete(DatasetDataSource
targetDatasource, Mutable<ILogicalExpression> varRef,
@@ -455,100 +434,6 @@
deleteOp.getInputs().add(new MutableObject<>(assign));
ILogicalOperator leafOperator = new DelegateOperator(new
CommitOperator(true));
leafOperator.getInputs().add(new MutableObject<>(deleteOp));
- return leafOperator;
- }
-
- private ILogicalOperator
translateSubscribeFeed(CompiledSubscribeFeedStatement sfs,
- DatasetDataSource targetDatasource, LogicalVariable unnestVar,
ILogicalOperator topOp,
- ArrayList<Mutable<ILogicalExpression>> exprs, LogicalVariable
resVar,
- List<Mutable<ILogicalExpression>> varRefsForLoading,
Mutable<ILogicalExpression> varRef,
- ILogicalOperator assign, List<String> additionalFilteringField,
AssignOperator additionalFilteringAssign,
- List<Mutable<ILogicalExpression>> additionalFilteringExpressions)
throws AlgebricksException {
- // if the feed is a change feed (i.e, performs different operations),
we need to project op variable
- InsertDeleteUpsertOperator feedModificationOp;
- AssignOperator metaAndKeysAssign;
- List<LogicalVariable> metaAndKeysVars = null;
- List<Mutable<ILogicalExpression>> metaAndKeysExprs = null;
- List<Mutable<ILogicalExpression>> metaExpSingletonList = null;
- Feed feed = metadataProvider.findFeed(sfs.getDataverseName(),
sfs.getFeedName());
- boolean isChangeFeed =
ExternalDataUtils.isChangeFeed(feed.getAdapterConfiguration());
- boolean isUpsertFeed =
ExternalDataUtils.isUpsertFeed(feed.getAdapterConfiguration());
-
- ProjectOperator project = (ProjectOperator) topOp;
- if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
- metaAndKeysVars = new ArrayList<>();
- metaAndKeysExprs = new ArrayList<>();
- if (targetDatasource.getDataset().hasMetaPart()) {
- // add the meta function
- IFunctionInfo finfoMeta =
FunctionUtil.getFunctionInfo(BuiltinFunctions.META);
- ScalarFunctionCallExpression metaFunction = new
ScalarFunctionCallExpression(finfoMeta,
- new MutableObject<>(new
VariableReferenceExpression(unnestVar)));
- // create assign for the meta part
- LogicalVariable metaVar = context.newVar();
- metaExpSingletonList = new ArrayList<>(1);
- metaExpSingletonList.add(new MutableObject<>(new
VariableReferenceExpression(metaVar)));
- metaAndKeysVars.add(metaVar);
- metaAndKeysExprs.add(new MutableObject<>(metaFunction));
- project.getVariables().add(metaVar);
- }
- }
- if (isChangeFeed) {
- varRefsForLoading.clear();
- for (Mutable<ILogicalExpression> assignExpr : exprs) {
- if (assignExpr.getValue().getExpressionTag() ==
LogicalExpressionTag.FUNCTION_CALL) {
- AbstractFunctionCallExpression funcCall =
(AbstractFunctionCallExpression) assignExpr.getValue();
- funcCall.substituteVar(resVar, unnestVar);
- LogicalVariable pkVar = context.newVar();
- metaAndKeysVars.add(pkVar);
- metaAndKeysExprs.add(new
MutableObject<>(assignExpr.getValue()));
- project.getVariables().add(pkVar);
- varRefsForLoading.add(new MutableObject<>(new
VariableReferenceExpression(pkVar)));
- }
- }
- // A change feed, we don't need the assign to access PKs
- feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
- metaExpSingletonList,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
- // Create and add a new variable used for representing the
original record
- feedModificationOp.setPrevRecordVar(context.newVar());
-
feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
- if (targetDatasource.getDataset().hasMetaPart()) {
- List<LogicalVariable> metaVars = new ArrayList<>();
- metaVars.add(context.newVar());
- feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
- List<Object> metaTypes = new ArrayList<>();
- metaTypes.add(targetDatasource.getMetaItemType());
-
feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
- }
-
- if (additionalFilteringField != null) {
- feedModificationOp.setPrevFilterVar(context.newVar());
- feedModificationOp.setPrevFilterType(
- ((ARecordType)
targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
- additionalFilteringAssign.getInputs().clear();
-
additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
- feedModificationOp.getInputs().add(new
MutableObject<>(additionalFilteringAssign));
- } else {
- feedModificationOp.getInputs().add(assign.getInputs().get(0));
- }
- } else {
- final InsertDeleteUpsertOperator.Kind opKind =
- isUpsertFeed ? InsertDeleteUpsertOperator.Kind.UPSERT :
InsertDeleteUpsertOperator.Kind.INSERT;
- feedModificationOp = new
InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
- metaExpSingletonList, opKind, false);
- if (isUpsertFeed) {
- feedModificationOp.setPrevRecordVar(context.newVar());
-
feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
- }
- feedModificationOp.getInputs().add(new MutableObject<>(assign));
- }
- if (targetDatasource.getDataset().hasMetaPart() || isChangeFeed) {
- metaAndKeysAssign = new AssignOperator(metaAndKeysVars,
metaAndKeysExprs);
- metaAndKeysAssign.getInputs().add(topOp.getInputs().get(0));
- topOp.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
- }
-
feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- ILogicalOperator leafOperator = new DelegateOperator(new
CommitOperator(true));
- leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
return leafOperator;
}
@@ -570,7 +455,6 @@
if (targetDatasource.getDataset().hasMetaPart()) {
if (returnExpression != null) {
throw new AlgebricksException("Returning not allowed on
datasets with Meta records");
-
}
AssignOperator metaAndKeysAssign;
List<LogicalVariable> metaAndKeysVars;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 583302b..4b78b93 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -50,7 +50,6 @@
import org.apache.asterix.external.feed.watch.FeedActivityDetails;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
import org.apache.asterix.lang.common.base.IQueryRewriter;
import org.apache.asterix.lang.common.base.IReturningStatement;
@@ -59,6 +58,7 @@
import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
import org.apache.asterix.lang.common.statement.FunctionDecl;
import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.optimizer.base.FuzzyUtils;
@@ -117,7 +117,7 @@
ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY,
CompilerProperties.COMPILER_GROUPMEMORY_KEY,
CompilerProperties.COMPILER_SORTMEMORY_KEY,
CompilerProperties.COMPILER_PARALLELISM_KEY,
FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
- FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
SubscribeFeedStatement.WAIT_FOR_COMPLETION,
+ FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
StartFeedStatement.WAIT_FOR_COMPLETION,
FeedActivityDetails.FEED_POLICY_NAME,
FeedActivityDetails.COLLECT_LOCATIONS, "inline_with",
"hash_merge", "output-record-type");
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 38e8a21..c0ce6ec 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -67,8 +67,8 @@
@Override
public synchronized void remove(Dataset dataset) throws
HyracksDataException {
super.remove(dataset);
- feedConnections.removeIf(o ->
o.getDataverseName().equals(dataset.getDataverseName())
- && o.getDatasetName().equals(dataset.getDatasetName()));
+ feedConnections.removeIf(o ->
o.getDataverseName().equals(dataset.getDataverseName()) && o.getDatasetName()
+ .equals(dataset.getDatasetName()));
}
public synchronized void addFeedConnection(FeedConnection feedConnection) {
@@ -82,12 +82,8 @@
@Override
protected void doStart(MetadataProvider mdProvider) throws
HyracksDataException {
try {
- ILangCompilationProvider compilationProvider = new
AqlCompilationProvider();
- IStorageComponentProvider storageComponentProvider = new
StorageComponentProvider();
- DefaultStatementExecutorFactory statementExecutorFactory = new
DefaultStatementExecutorFactory();
- Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint>
jobInfo = FeedOperations.buildStartFeedJob(
- ((QueryTranslator) statementExecutor).getSessionOutput(),
mdProvider, feed, feedConnections,
- compilationProvider, storageComponentProvider,
statementExecutorFactory, hcc);
+ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint>
jobInfo =
+ FeedOperations.buildStartFeedJob(mdProvider, feed,
feedConnections, statementExecutor, hcc);
JobSpecification feedJob = jobInfo.getLeft();
WaitForStateSubscriber eventSubscriber = new
WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING,
ActivityState.TEMPORARILY_FAILED,
ActivityState.PERMANENTLY_FAILED));
@@ -119,8 +115,8 @@
// Construct ActiveMessage
for (int i = 0; i < getLocations().getLocations().length; i++) {
String intakeLocation = getLocations().getLocations()[i];
-
FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(),
entityId, intakeLocation,
- i);
+ FeedOperations
+
.SendStopMessageToNode(metadataProvider.getApplicationContext(), entityId,
intakeLocation, i);
}
eventSubscriber.sync();
} catch (Exception e) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6b4483c..91d3e46 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -122,6 +122,7 @@
import org.apache.asterix.lang.common.statement.TypeDropStatement;
import org.apache.asterix.lang.common.statement.WriteStatement;
import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -1669,7 +1670,8 @@
throw new AlgebricksException("There is no dataverse with this
name " + dataverse + ".");
}
Function function = new Function(dataverse, functionName,
cfs.getaAterixFunction().getArity(),
- cfs.getParamList(), Function.RETURNTYPE_VOID,
cfs.getFunctionBody(), Function.LANGUAGE_AQL,
+ cfs.getParamList(), Function.RETURNTYPE_VOID,
cfs.getFunctionBody(),
+ rewriterFactory instanceof SqlppRewriterFactory ?
Function.LANGUAGE_SQLPP : Function.LANGUAGE_AQL,
FunctionKind.SCALAR.toString());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
@@ -2186,7 +2188,7 @@
throw new AlgebricksException("Feed" + feedName + " is already
connected dataset " + datasetName);
}
fc = new FeedConnection(dataverseName, feedName, datasetName,
appliedFunctions, policyName,
- outputType.toString());
+ outputType.getTypeName());
MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(),
fc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
if (listener != null) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index cc95770..6944920 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -20,6 +20,7 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -33,32 +34,53 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedActivityDetails;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.util.FeedConstants;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.literal.IntegerLiteral;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.lang.sqlpp.clause.FromClause;
+import org.apache.asterix.lang.sqlpp.clause.FromTerm;
+import org.apache.asterix.lang.sqlpp.clause.SelectBlock;
+import org.apache.asterix.lang.sqlpp.clause.SelectClause;
+import org.apache.asterix.lang.sqlpp.clause.SelectElement;
+import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
+import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
+import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
@@ -109,6 +131,8 @@
*/
public class FeedOperations {
+ public static final String FEED_DATAFLOW_INTERMEIDATE_VAL_PREFIX =
"int_val_for_feed_fun";
+
private FeedOperations() {
}
@@ -154,30 +178,69 @@
return spec;
}
- private static JobSpecification getConnectionJob(SessionOutput
sessionOutput, MetadataProvider metadataProvider,
- FeedConnection feedConnection, String[] locations,
ILangCompilationProvider compilationProvider,
- IStorageComponentProvider storageComponentProvider,
DefaultStatementExecutorFactory qtFactory,
- IHyracksClientConnection hcc) throws AlgebricksException,
RemoteException, ACIDException {
- DataverseDecl dataverseDecl = new DataverseDecl(new
Identifier(feedConnection.getDataverseName()));
- FeedConnectionRequest fcr =
- new FeedConnectionRequest(FeedRuntimeType.INTAKE,
feedConnection.getAppliedFunctions(),
- feedConnection.getDatasetName(),
feedConnection.getPolicyName(), feedConnection.getFeedId());
- SubscribeFeedStatement subscribeStmt = new
SubscribeFeedStatement(locations, fcr);
- subscribeStmt.initialize(metadataProvider.getMetadataTxnContext());
- List<Statement> statements = new ArrayList<>();
- statements.add(dataverseDecl);
- statements.add(subscribeStmt);
- IStatementExecutor translator =
qtFactory.create(metadataProvider.getApplicationContext(), statements,
- sessionOutput, compilationProvider, storageComponentProvider);
- // configure the metadata provider
-
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" +
Boolean.TRUE);
- metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME,
"" + subscribeStmt.getPolicy());
- metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
- StringUtils.join(subscribeStmt.getLocations(), ','));
+ private static List<Expression> addArgs(Object... args) {
+ List<Expression> argExprs = new ArrayList<>();
+ for (Object arg : args) {
+ if (arg instanceof Integer) {
+ argExprs.add(new LiteralExpr(new IntegerLiteral((Integer)
arg)));
+ } else if (arg instanceof String) {
+ argExprs.add(new LiteralExpr(new StringLiteral((String) arg)));
+ } else if (arg instanceof VariableExpr){
+ argExprs.add((VariableExpr) arg);
+ }
+ }
+ return argExprs;
+ }
- CompiledStatements.CompiledSubscribeFeedStatement csfs = new
CompiledStatements.CompiledSubscribeFeedStatement(
- subscribeStmt.getSubscriptionRequest(),
subscribeStmt.getVarCounter());
- return translator.rewriteCompileQuery(hcc, metadataProvider,
subscribeStmt.getQuery(), csfs);
+ private static Query makeConnectionQuery(FeedConnection feedConnection) {
+ // Construct from clause
+ VarIdentifier fromVarId =
SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
+ VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
+ // TODO: remove target feedid from args list (xikui)
+ // TODO: Get rid of this INTAKE thing
+ List<Expression> exprList =
+ addArgs(feedConnection.getDataverseName(),
feedConnection.getFeedId().getEntityName(),
+ feedConnection.getFeedId().getEntityName(),
FeedRuntimeType.INTAKE.toString(),
+ feedConnection.getDatasetName(),
feedConnection.getOutputType());
+ CallExpr datasrouceCallFunction = new
CallExpr(FeedConstants.FEED_COLLECT_FUN_SIGN, exprList);
+ FromTerm fromterm = new FromTerm(datasrouceCallFunction,
fromTermLeftExpr, null, null);
+ FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+ // TODO: This can be the place to add select predicate for ingestion
+ // Attaching functions
+ int varIdx = 1;
+ VariableExpr previousVarExpr = fromTermLeftExpr;
+ ArrayList<LetClause> letClauses = new ArrayList<>();
+ for (FunctionSignature funcSig : feedConnection.getAppliedFunctions())
{
+ VarIdentifier intermediateVar = SqlppVariableUtil
+
.toInternalVariableIdentifier(FEED_DATAFLOW_INTERMEIDATE_VAL_PREFIX +
String.valueOf(varIdx));
+ VariableExpr intermediateVarExpr = new
VariableExpr(intermediateVar);
+ CallExpr functionCallExpr = new CallExpr(funcSig,
addArgs(previousVarExpr));
+ previousVarExpr = intermediateVarExpr;
+ LetClause letClause = new LetClause(intermediateVarExpr,
functionCallExpr);
+ letClauses.add(letClause);
+ varIdx++;
+ }
+ // Constructing select clause
+ SelectElement selectElement = new SelectElement(previousVarExpr);
+ SelectClause selectClause = new SelectClause(selectElement, null,
false);
+ SelectBlock selectBlock = new SelectBlock(selectClause, fromClause,
letClauses, null, null, null, null);
+ SelectSetOperation selectSetOperation = new SelectSetOperation(new
SetOperationInput(selectBlock, null), null);
+ SelectExpression body = new SelectExpression(null, selectSetOperation,
null, null, true);
+ Query query = new Query(false, true, body, 0);
+ return query;
+ }
+
+ private static JobSpecification getConnectionJob(MetadataProvider
metadataProvider, FeedConnection feedConn,
+ IStatementExecutor statementExecutor, IHyracksClientConnection hcc)
+ throws AlgebricksException, RemoteException, ACIDException {
+ metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME,
feedConn.getPolicyName());
+ Query feedConnQuery = makeConnectionQuery(feedConn);
+ UpsertStatement stmtUpsert = new UpsertStatement(new
Identifier(feedConn.getDataverseName()),
+ new Identifier(feedConn.getDatasetName()), feedConnQuery, -1,
null, null);
+ CompiledStatements.CompiledUpsertStatement clfrqs =
+ new
CompiledStatements.CompiledUpsertStatement(feedConn.getDataverseName(),
feedConn.getDatasetName(),
+ feedConnQuery, stmtUpsert.getVarCounter(), null, null);
+ return statementExecutor.rewriteCompileQuery(hcc, metadataProvider,
feedConnQuery, clfrqs);
}
private static JobSpecification combineIntakeCollectJobs(MetadataProvider
metadataProvider, Feed feed,
@@ -220,9 +283,9 @@
String datasetName = feedConnections.get(iter1).getDatasetName();
FeedConnectionId feedConnectionId = new
FeedConnectionId(ingestionOp.getEntityId(), datasetName);
- FeedPolicyEntity feedPolicyEntity =
-
FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
- curFeedConnection.getPolicyName(),
metadataProvider.getMetadataTxnContext());
+ FeedPolicyEntity feedPolicyEntity = FeedMetadataUtil
+
.validateIfPolicyExists(curFeedConnection.getDataverseName(),
curFeedConnection.getPolicyName(),
+ metadataProvider.getMetadataTxnContext());
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry :
operatorsMap.entrySet()) {
IOperatorDescriptor opDesc = entry.getValue();
@@ -275,8 +338,8 @@
}
// make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>,
- Pair<IOperatorDescriptor, Integer>>> entry :
subJob.getConnectorOperatorMap().entrySet()) {
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+ .getConnectorOperatorMap().entrySet()) {
ConnectorDescriptorId newId =
connectorIdMapping.get(entry.getKey());
IConnectorDescriptor connDesc =
jobSpec.getConnectorMap().get(newId);
Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
@@ -358,24 +421,36 @@
return jobSpec;
}
+ private static IStatementExecutor getSQLPPTranslator(MetadataProvider
metadataProvider,
+ SessionOutput sessionOutput) {
+ List<Statement> stmts = new ArrayList<>();
+ DefaultStatementExecutorFactory qtFactory = new
DefaultStatementExecutorFactory();
+ IStatementExecutor translator = qtFactory
+ .create(metadataProvider.getApplicationContext(), stmts,
sessionOutput, new SqlppCompilationProvider(),
+ new StorageComponentProvider());
+ return translator;
+ }
+
public static Pair<JobSpecification,
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
- SessionOutput sessionOutput, MetadataProvider metadataProvider,
Feed feed,
- List<FeedConnection> feedConnections, ILangCompilationProvider
compilationProvider,
- IStorageComponentProvider storageComponentProvider,
DefaultStatementExecutorFactory qtFactory,
- IHyracksClientConnection hcc) throws Exception {
+ MetadataProvider metadataProvider, Feed feed, List<FeedConnection>
feedConnections,
+ IStatementExecutor statementExecutor, IHyracksClientConnection
hcc) throws Exception {
FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
- // TODO: Change the default Datasource to use all possible partitions
Pair<JobSpecification, IAdapterFactory> intakeInfo =
buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
- //TODO: Add feed policy accessor
List<JobSpecification> jobsList = new ArrayList<>();
// Construct the ingestion Job
JobSpecification intakeJob = intakeInfo.getLeft();
IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
String[] ingestionLocations =
ingestionAdaptorFactory.getPartitionConstraint().getLocations();
+ // Add metadata configs
+
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
Boolean.TRUE.toString());
+ metadataProvider.getConfig()
+ .put(FeedActivityDetails.COLLECT_LOCATIONS,
StringUtils.join(ingestionLocations, ','));
+ // TODO: Once we deprecated AQL, this extra queryTranslator can be
removed.
+ IStatementExecutor translator =
+ getSQLPPTranslator(metadataProvider, ((QueryTranslator)
statementExecutor).getSessionOutput());
// Add connection job
for (FeedConnection feedConnection : feedConnections) {
- JobSpecification connectionJob = getConnectionJob(sessionOutput,
metadataProvider, feedConnection,
- ingestionLocations, compilationProvider,
storageComponentProvider, qtFactory, hcc);
+ JobSpecification connectionJob =
getConnectionJob(metadataProvider, feedConnection, translator, hcc);
jobsList.add(connectionJob);
}
return Pair.of(combineIntakeCollectJobs(metadataProvider, feed,
intakeJob, jobsList, feedConnections,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
index 334dd52..17a4d4b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
@@ -19,5 +19,10 @@
!-->
<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
QueryFileExtension=".sqlpp">
<test-group name="failed">
+ <test-case FilePath="feeds">
+ <compilation-unit name="connect-feed-with-function">
+ <output-dir compare="Text">connect-feed-with-function</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
deleted file mode 100644
index dcf2278..0000000
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.5.update.aql
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-use dataverse experiments;
-stop feed UserFeed;
-disconnect feed UserFeed from dataset TwitterUsers;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
deleted file mode 100644
index 1a06334..0000000
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.6.query.aql
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-use dataverse experiments;
-
-for $x in dataset TwitterUsers
-order by $x.screen-name
-return $x.true_popularity;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.ddl.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.aql
similarity index 100%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.ddl.aql
rename to
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.aql
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.2.update.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.2.update.aql
similarity index 100%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.2.update.aql
rename to
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.2.update.aql
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.8.ddl.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
similarity index 96%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.8.ddl.aql
rename to
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
index 7722945..46056b1 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.8.ddl.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-function/connect-feed-with-function.3.ddl.aql
@@ -24,5 +24,4 @@
* Expected Res : Success
* Date : 29th Mar 2017
*/
-use dataverse experiments;
drop dataverse experiments;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
new file mode 100644
index 0000000..4de5415
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.1.ddl.sqlpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a feed and apply two functions in the
+ * workflow. The output of the first function can be used in
+ * the second function. The function parameter can have any
+ * name.
+ * Expected Res : Success
+ * Date : 4th Oct 2017
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TwitterUser if not exists as open{
+ `screen-name`: string,
+ friends_count: int32,
+ name: string,
+ followers_count: int32
+};
+
+create dataset TwitterUsers(TwitterUser) primary key `screen-name`;
+
+create function test_func0(xyz) {
+ object_merge((case (xyz.followers_count > 25000) when true then
{"popularity":"Good!"} else {"popularity":"Bad!"} end), xyz)
+};
+
+create function test_func1(anyname) {
+ object_merge((case (anyname.popularity = "Good!") when true then
{"true_popularity":"Indeed Good!"} else {"true_popularity":"Indeed Bad!"} end),
anyname)
+};
+
+create feed UserFeed using socket_adapter
+(
+ ("sockets"="127.0.0.1:10001"),
+ ("address-type"="IP"),
+ ("type-name"="TwitterUser"),
+ ("format"="adm"),
+ ("upsert-feed"="true")
+);
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
index 4ba1c81..9606396 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.2.update.sqlpp
@@ -16,13 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
+use experiments;
-stop 10001
\ No newline at end of file
+connect feed UserFeed to dataset TwitterUsers apply function
test_func0,test_func1;
+
+start feed UserFeed;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.3.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.3.server.sqlpp
similarity index 100%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.3.server.aql
rename to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.3.server.sqlpp
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.4.sleep.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
similarity index 75%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.4.sleep.aql
rename to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
index dc5dae0..e50b429 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.4.sleep.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.4.sleep.sqlpp
@@ -16,12 +16,5 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
+
2000
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
index 4ba1c81..136d142 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.5.update.sqlpp
@@ -16,13 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+use experiments;
+stop feed UserFeed;
+disconnect feed UserFeed from dataset TwitterUsers;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
index 4ba1c81..4b1dba7 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.6.query.sqlpp
@@ -16,13 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
+use experiments;
-stop 10001
\ No newline at end of file
+select value t.true_popularity from TwitterUsers t
+order by t.`screen-name`;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
similarity index 75%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
index 4ba1c81..c3ba795 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.7.server.sqlpp
@@ -16,13 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-
stop 10001
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
index 4ba1c81..ec08f08 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/connect-feed-with-function/connect-feed-with-function.8.ddl.sqlpp
@@ -16,13 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+drop dataverse experiments;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
new file mode 100644
index 0000000..9b0f2a0
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.1.ddl.sqlpp
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Apply user defined function to feed.
+ * Expected Res : Success
+ * Date : 4th Oct 2017
+ */
+
+drop dataverse externallibtest if exists;
+create dataverse externallibtest;
+use externallibtest;
+
+create type TweetInputType as open {
+ id: string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string
+};
+
+create type TweetOutputType as open {
+ id: string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string,
+ topics : {{string}}
+};
+
+create feed TweetFeed
+using localfs
+(("type-name"="TweetInputType"),
+("path"="asterix_nc1://data/twitter/obamatweets.adm"),
+("format"="adm"));
+
+create dataset TweetsFeedIngest(TweetOutputType)
+primary key id;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
index 4ba1c81..d1e0e87 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.2.lib.sqlpp
@@ -16,13 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+install externallibtest testlib
target/data/externallib/asterix-external-data-testlib.zip
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
similarity index 77%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
rename to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 4ba1c81..883cd7a 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -17,12 +17,12 @@
* under the License.
*/
/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
+ * Description : Apply user defined function to feed.
* Expected Res : Success
- * Date : 29th Mar 2017
+ * Date : 4th Oct 2017
*/
+use externallibtest;
-stop 10001
\ No newline at end of file
+connect feed TweetFeed to dataset TweetsFeedIngest apply function
`testlib#parseTweet`;
+
+start feed TweetFeed;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
similarity index 77%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
index 4ba1c81..607e5bd 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.5.pollquery.sqlpp
@@ -17,12 +17,13 @@
* under the License.
*/
/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
+ * Description : Apply user defined function to feed.
* Expected Res : Success
- * Date : 29th Mar 2017
+ * Date : 4th Oct 2017
*/
+// polltimeoutsecs=5
+use externallibtest;
-stop 10001
\ No newline at end of file
+select value t from TweetsFeedIngest t
+ORDER BY t.id;
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
index 4ba1c81..86af80f 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.6.lib.sqlpp
@@ -16,13 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+uninstall externallibtest testlib
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
similarity index 74%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
index 4ba1c81..2a7acef 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.7.server.aql
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.7.ddl.sqlpp
@@ -16,13 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-/*
- * Description : Create a feed and apply two functions in the
- * workflow. The output of the first function can be used in
- * the second function. The function parameter can have any
- * name.
- * Expected Res : Success
- * Date : 29th Mar 2017
- */
-
-stop 10001
\ No newline at end of file
+drop dataverse externallibtest if exists;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-function/connect-feed-with-function.1.adm
similarity index 100%
rename from
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-aql-function/connect-feed-with-aql-function.1.adm
rename to
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/connect-feed-with-function/connect-feed-with-function.1.adm
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index fbd87b6..e8fba07 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets",
"ReturnType": "TweetType: closed {\n id: string,\n username: string,\n
location: string,\n text: string,\n timestamp: string\n}\n",
"AppliedFunctions": {{ "feeds.feed_processor" }}, "PolicyName": "Basic" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets",
"ReturnType": "TweetType", "AppliedFunctions": {{ "feeds.feed_processor" }},
"PolicyName": "Basic" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index abbcaaa..47560f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -252,8 +252,9 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
- <compilation-unit name="connect-feed-with-aql-function">
- <output-dir compare="Text">connect-feed-with-aql-function</output-dir>
+ <compilation-unit name="connect-feed-with-function">
+ <output-dir compare="Text">connect-feed-with-function</output-dir>
+ <expected-error>Incompatible function language</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
index 83cdd82..8f4455f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
@@ -43,5 +43,10 @@
<output-dir compare="Text">upperCase</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feed-with-external-function">
+ <output-dir compare="Text">feed-with-external-function</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index c87c44b..0d7ef36 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8248,6 +8248,11 @@
<output-dir compare="Text">upsert-feed</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="connect-feed-with-function">
+ <output-dir compare="Text">connect-feed-with-function</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="hdfs">
<test-case FilePath="hdfs">
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f960ce5..279624d 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -129,6 +129,7 @@
public static final int INDEX_ILLEGAL_REPETITIVE_FIELD = 1052;
public static final int CANNOT_CREATE_SEC_PRIMARY_IDX_ON_EXT_DATASET =
1053;
public static final int COMPILATION_FAILED_DUE_TO_REPLICATE_OP = 1054;
+ public static final int COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE = 1055;
// Feed errors
public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 7362181..6ce78f0 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -115,6 +115,7 @@
1052 = Cannot create index with the same field \"%1$s\" specified more than
once.
1053 = Cannot create primary index on external dataset.
1054 = Compilation failed due to some problem in the query plan.
+1055 = Incompatible function language. Expect %1$s, but %2$s found.
# Feed Errors
3001 = Illegal state.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index 9538711..4f5c9b2 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -18,7 +18,13 @@
*/
package org.apache.asterix.external.util;
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+
public class FeedConstants {
+
+ public static final FunctionSignature FEED_COLLECT_FUN_SIGN =
+ new FunctionSignature(FunctionConstants.ASTERIX_NS,
"feed_collect", 6);
public final static String FEEDS_METADATA_DV = "feeds_metadata";
public final static String FAILED_TUPLE_DATASET = "failed_tuple";
@@ -31,7 +37,6 @@
public static final String INTAKE_TIMESTAMP = "intake-timestamp";
public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
public static final String STORE_TIMESTAMP = "store-timestamp";
-
}
public static final class MessageConstants {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index dad0d51..1097a07 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -28,6 +28,8 @@
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.utils.StoragePathUtil;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git
a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
index 47a9580..332dd57 100644
---
a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
+++
b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/parser/FunctionParser.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
@@ -40,6 +41,10 @@
}
public FunctionDecl getFunctionDecl(Function function) throws
CompilationException {
+ if (function.getLanguage() != Function.LANGUAGE_AQL) {
+ throw new
CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE,
+ Function.LANGUAGE_AQL, function.getLanguage());
+ }
String functionBody = function.getFunctionBody();
List<String> params = function.getParams();
List<VarIdentifier> varIdentifiers = new ArrayList<VarIdentifier>();
diff --git
a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index f0539c6..a28c196 100644
---
a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++
b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -47,7 +47,9 @@
/**
* Represents the AQL statement for subscribing to a feed.
* This AQL statement is private and may not be used by the end-user.
+ *
*/
+@Deprecated
public class SubscribeFeedStatement implements Statement {
public static final String WAIT_FOR_COMPLETION =
"wait-for-completion-feed";
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
index 2b24ea1..3c1fe5a 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/parser/FunctionParser.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
@@ -40,6 +41,10 @@
}
public FunctionDecl getFunctionDecl(Function function) throws
CompilationException {
+ if (function.getLanguage() != Function.LANGUAGE_SQLPP) {
+ throw new
CompilationException(ErrorCode.COMPILATION_INCOMPATIBLE_FUNCTION_LANGUAGE,
+ Function.LANGUAGE_SQLPP, function.getLanguage());
+ }
String functionBody = function.getFunctionBody();
List<String> params = function.getParams();
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index fa60bba..9156b0f 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -63,6 +63,7 @@
import org.apache.asterix.metadata.utils.InvertedIndexResourceFactoryProvider;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider;
+import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.RecordUtil;
@@ -85,6 +86,10 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
@@ -289,8 +294,31 @@
&& Objects.equals(datasetName, otherDataset.datasetName);
}
- public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR:
this method is meant to be extended
- return !hasMetaPart();
+ public boolean allow(ILogicalOperator topOp, byte operation) {
+ if (!hasMetaPart()) {
+ return true;
+ }
+ if (topOp.getInputs().get(0).getValue().getOperatorTag() !=
LogicalOperatorTag.ASSIGN) {
+ return false;
+ }
+ ILogicalOperator op = topOp.getInputs().get(0).getValue();
+ while ((!op.getInputs().isEmpty())
+ && op.getInputs().get(0).getValue().getOperatorTag() !=
LogicalOperatorTag.UNNEST) {
+ op = op.getInputs().get(0).getValue();
+ }
+ if (op.getInputs().isEmpty()) {
+ return false;
+ }
+ UnnestOperator unnestOp = (UnnestOperator)
op.getInputs().get(0).getValue();
+ if (unnestOp.getExpressionRef().getValue().getExpressionTag() !=
LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression functionCall =
+ (AbstractFunctionCallExpression)
unnestOp.getExpressionRef().getValue();
+ if (functionCall.getFunctionIdentifier() !=
BuiltinFunctions.FEED_COLLECT) {
+ return false;
+ }
+ return operation == DatasetUtil.OP_UPSERT;
}
/**
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
index 7ff423c..1d1db37 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
@@ -26,6 +26,7 @@
public class Function implements IMetadataEntity<Function> {
private static final long serialVersionUID = 1L;
public static final String LANGUAGE_AQL = "AQL";
+ public static final String LANGUAGE_SQLPP = "SQLPP";
public static final String LANGUAGE_JAVA = "JAVA";
public static final String RETURNTYPE_VOID = "VOID";
--
To view, visit https://asterix-gerrit.ics.uci.edu/2059
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0ae5a837613780a4d2c90c98139fdc6d5e040cc9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>