Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2255
Change subject: [WIP] Add filter query to feed connection
......................................................................
[WIP] Add filter query to feed connection
Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M
asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
M
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
15 files changed, 204 insertions(+), 14 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/55/2255/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index de0c88f..29ca171 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2191,6 +2191,7 @@
String feedName = cfs.getFeedName();
String datasetName = cfs.getDatasetName().getValue();
String policyName = cfs.getPolicy();
+ String whereClauseBody = cfs.getWhereClauseBody();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// TODO: Check whether we are connecting a change feed to a non-meta
dataset
@@ -2223,7 +2224,7 @@
if (fc != null) {
throw new AlgebricksException("Feed" + feedName + " is already
connected dataset " + datasetName);
}
- fc = new FeedConnection(dataverseName, feedName, datasetName,
appliedFunctions, policyName,
+ fc = new FeedConnection(dataverseName, feedName, datasetName,
appliedFunctions, policyName, whereClauseBody,
outputType.getTypeName());
MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(),
fc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index ac249b7..23ca8c0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -40,6 +40,7 @@
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.TxnId;
@@ -58,8 +59,11 @@
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -78,6 +82,7 @@
import org.apache.asterix.lang.sqlpp.clause.SelectElement;
import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -192,7 +197,7 @@
return argExprs;
}
- private static Query makeConnectionQuery(FeedConnection feedConnection) {
+ private static Query makeConnectionQuery(FeedConnection feedConnection)
throws AlgebricksException {
// Construct from clause
VarIdentifier fromVarId =
SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
@@ -205,6 +210,19 @@
CallExpr datasrouceCallFunction = new CallExpr(new
FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
FromTerm fromterm = new FromTerm(datasrouceCallFunction,
fromTermLeftExpr, null, null);
FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+ WhereClause whereClause = null;
+ if(feedConnection.getWhereClauseBody() != null) {
+ String whereClauseExpr = feedConnection.getWhereClauseBody() + ";";
+ IParserFactory sqlppParserFactory = new SqlppParserFactory();
+ IParser sqlppParser =
sqlppParserFactory.createParser(whereClauseExpr);
+ List<Statement> stmts = sqlppParser.parse();
+ if (stmts.size() != 1) {
+ throw new CompilationException("Exceptions happened in
processing where clause.");
+ }
+ Query whereClauseQuery = (Query) stmts.get(0);
+ whereClause = new WhereClause(whereClauseQuery.getBody());
+ }
+
// TODO: This can be the place to add select predicate for ingestion
// Attaching functions
int varIdx = 1;
@@ -223,7 +241,7 @@
// Constructing select clause
SelectElement selectElement = new SelectElement(previousVarExpr);
SelectClause selectClause = new SelectClause(selectElement, null,
false);
- SelectBlock selectBlock = new SelectBlock(selectClause, fromClause,
letClauses, null, null, null, null);
+ SelectBlock selectBlock = new SelectBlock(selectClause, fromClause,
letClauses, whereClause, null, null, null);
SelectSetOperation selectSetOperation = new SelectSetOperation(new
SetOperationInput(selectBlock, null), null);
SelectExpression body = new SelectExpression(null, selectSetOperation,
null, null, true);
Query query = new Query(false, true, body, 0);
diff --git
a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
index 7229aa8..9be7701 100644
---
a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
+++
b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
@@ -21,7 +21,7 @@
{ "DataverseName": "Metadata", "DatatypeName":
"DatatypeRecordType_Derived_Record_Fields_Item", "Derived": { "Tag": "RECORD",
"IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName":
"FieldName", "FieldType": "string", "IsNullable": false }, { "FieldName":
"FieldType", "FieldType": "string", "IsNullable": false }, { "FieldName":
"IsNullable", "FieldType": "boolean", "IsNullable": false } ] } }, "Timestamp":
"Fri Oct 21 10:29:21 PDT 2016" }
{ "DataverseName": "Metadata", "DatatypeName": "DataverseRecordType",
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true,
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "DataFormat", "FieldType": "string",
"IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string",
"IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32",
"IsNullable": false } ] } }, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016" }
{ "DataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType",
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true,
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "FileNumber", "FieldType": "int32",
"IsNullable": false }, { "FieldName": "FileName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "FileSize", "FieldType": "int64",
"IsNullable": false }, { "FieldName": "FileModTime", "FieldType": "datetime",
"IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32",
"IsNullable": false } ] } }, "Timestamp": "Fri Oct 21 10:29:22 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType",
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true,
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "FeedName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "ReturnType", "FieldType": "string",
"IsNullable": false }, { "FieldName": "AppliedFunctions", "FieldType":
"FeedConnectionRecordType_AppliedFunctions", "IsNullable": false }, {
"FieldName": "PolicyName", "FieldType": "string", "IsNullable": false } ] } },
"Timestamp": "Fri Oct 21 10:29:22 PDT 2016" }
+{ "DataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType",
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true,
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "FeedName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "ReturnType", "FieldType": "string",
"IsNullable": false }, { "FieldName": "AppliedFunctions", "FieldType":
"FeedConnectionRecordType_AppliedFunctions", "IsNullable": false }, {
"FieldName": "WhereClause", "FieldType": "string", "IsNullable": false }, {
"FieldName": "PolicyName", "FieldType": "string", "IsNullable": false } ] } },
"Timestamp": "Tue Jan 02 14:03:26 PST 2018" }
{ "DataverseName": "Metadata", "DatatypeName":
"FeedConnectionRecordType_AppliedFunctions", "Derived": { "Tag":
"UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "string" }, "Timestamp":
"Fri Oct 21 10:29:22 PDT 2016" }
{ "DataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType",
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true,
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "PolicyName", "FieldType": "string",
"IsNullable": false }, { "FieldName": "Description", "FieldType": "string",
"IsNullable": false }, { "FieldName": "Properties", "FieldType":
"FeedPolicyRecordType_Properties", "IsNullable": false } ] } }, "Timestamp":
"Fri Oct 21 10:29:22 PDT 2016" }
{ "DataverseName": "Metadata", "DatatypeName":
"FeedPolicyRecordType_Properties", "Derived": { "Tag": "UNORDEREDLIST",
"IsAnonymous": true, "UnorderedList": "FeedPolicyRecordType_Properties_Item" },
"Timestamp": "Fri Oct 21 10:29:22 PDT 2016" }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
new file mode 100644
index 0000000..a4314c9
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse feeds if exists;
+create dataverse feeds;
+use feeds;
+
+create type feeds.TweetType as
+ closed {
+ id : string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string
+};
+
+create dataset Tweets1(TweetType) primary key id;
+create dataset Tweets2(TweetType) primary key id;
+create dataset Tweets3(TweetType) primary key id;
+create dataset Tweets4(TweetType) primary key id;
+
+create feed TweetFeed using localfs
+((`path`=`asterix_nc1://data/twitter/obamatweets.adm`),
+(`format`=`adm`),
+(`type-name`=`TweetType`),
+(`tuple-interval`=`10`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
new file mode 100644
index 0000000..dd83b35
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+set `wait-for-completion-feed` `true`;
+connect feed feeds.TweetFeed to dataset Tweets1 using policy `Basic` WHERE id
= 'nc1:115';
+connect feed feeds.TweetFeed to dataset Tweets2 using policy `Basic` WHERE id
LIKE 'nc1:11%';
+connect feed feeds.TweetFeed to dataset Tweets3 using policy `Basic` WHERE id
NOT LIKE 'nc1:10%' OR username = 'BronsonMike';
+connect feed feeds.TweetFeed to dataset Tweets4 using policy `Basic` WHERE id
LIKE 'nc1:11%' AND username = 'thewildpitch';
+
+start feed feeds.TweetFeed;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
new file mode 100644
index 0000000..20c6320
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+
+select value x
+from Tweets1 as x
+union all
+select value x2
+from Tweets2 as x2
+union all
+select value x3
+from Tweets3 as x3
+union all
+select value x4
+from Tweets4 as x4
+order by id;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
new file mode 100644
index 0000000..2c37364
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
@@ -0,0 +1,15 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text":
"@GottaLaff @reutersus Christie and obama just foul weather friends",
"timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished
http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished
http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT
@WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...
#Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT
@WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...
#Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT
@ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President
Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec
06 16:53:12 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT
@ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President
Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec
06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful
People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful
People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful
People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful
People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT
@ConservativeQuo: unemployment is above 8% again. I wonder how long it will
take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp":
"Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT
@ConservativeQuo: unemployment is above 8% again. I wonder how long it will
take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp":
"Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT
@Newitrsdotcom: I hope #Obama will win re-election... Other four years without
meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT
@Newitrsdotcom: I hope #Obama will win re-election... Other four years without
meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2c3d855..05425ff 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8335,6 +8335,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
+ <compilation-unit name="feeds_13">
+ <output-dir compare="Text">feeds_13</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="issue_230_feeds">
<output-dir compare="Text">issue_230_feeds</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index d55db87..b384119 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1186,7 +1186,7 @@
<CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET>
datasetNameComponents = QualifiedName()
(ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
{
- stmt = new ConnectFeedStatement(feedNameComponents,
datasetNameComponents, appliedFunctions, policy, getVarCounter());
+ stmt = new ConnectFeedStatement(feedNameComponents,
datasetNameComponents, appliedFunctions, policy, null, getVarCounter());
}
| <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM>
<DATASET> datasetNameComponents = QualifiedName()
{
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 3d8fc68..95f8f47 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -34,11 +34,12 @@
private final Identifier datasetName;
private final String feedName;
private final String policy;
+ private final String whereClauseBody;
private int varCounter;
private final List<FunctionSignature> appliedFunctions;
public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp,
Pair<Identifier, Identifier> datasetNameCmp,
- List<FunctionSignature> appliedFunctions, String policy, int
varCounter) {
+ List<FunctionSignature> appliedFunctions, String policy, String
whereClauseBody, int varCounter) {
if (feedNameCmp.first != null && datasetNameCmp.first != null
&&
!feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) {
throw new IllegalArgumentException("Dataverse for source feed and
target dataset do not match");
@@ -48,6 +49,7 @@
this.datasetName = datasetNameCmp.second;
this.feedName = feedNameCmp.second.getValue();
this.policy = policy != null ? policy :
BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
+ this.whereClauseBody = whereClauseBody;
this.varCounter = varCounter;
this.appliedFunctions = appliedFunctions;
}
@@ -64,6 +66,10 @@
return varCounter;
}
+ public String getWhereClauseBody() {
+ return whereClauseBody;
+ }
+
@Override
public byte getKind() {
return Statement.Kind.CONNECT_FEED;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index b68ffd6..81c97a5 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -158,6 +158,7 @@
import org.apache.asterix.lang.sqlpp.clause.SelectRegular;
import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
import org.apache.asterix.lang.sqlpp.optype.JoinType;
@@ -1281,14 +1282,37 @@
List<FunctionSignature> appliedFunctions = new
ArrayList<FunctionSignature>();
Statement stmt = null;
String policy = null;
+ String whereClauseBody = null;
+ WhereClause whereClause = null;
+ Token beginPos = null;
+ Token endPos = null;
}
{
(
<FEED> feedNameComponents = QualifiedName() <TO> Dataset()
datasetNameComponents = QualifiedName()
- (ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
+ (ApplyFunction(appliedFunctions))?
+ (policy = GetPolicy())?
+ (
+ <WHERE>
+ {
+ beginPos = token;
+ whereClause = new WhereClause();
+ Expression whereExpr;
+ }
+ whereExpr = Expression()
+ {
+ whereClause.setWhereExpr(whereExpr);
+ }
+ )?
+ {
+ if (whereClause != null) {
+ endPos = token;
+ whereClauseBody = extractFragment(beginPos.endLine,
beginPos.endColumn, endPos.endLine, endPos.endColumn + 1);
+ }
+ }
{
stmt = new ConnectFeedStatement(feedNameComponents,
datasetNameComponents, appliedFunctions,
- policy, getVarCounter());
+ policy, whereClauseBody, getVarCounter());
}
)
{
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 406b3d6..1f18bc9 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -245,7 +245,8 @@
}
public void dropFeedConnection(String dataverseName, String feedName,
String datasetName) {
- FeedConnection feedConnection = new FeedConnection(dataverseName,
feedName, datasetName, null, null, null);
+ FeedConnection feedConnection = new FeedConnection(dataverseName,
feedName, datasetName, null, null, null,
+ null);
droppedCache.addFeedConnectionIfNotExists(feedConnection);
logAndApply(new MetadataLogicalOperation(feedConnection, false));
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 2a04b58..6b7532b 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -99,6 +99,7 @@
public static final String FIELD_NAME_VALUE = "Value";
public static final String FIELD_NAME_WORKING_MEMORY_SIZE =
"WorkingMemorySize";
public static final String FIELD_NAME_APPLIED_FUNCTIONS =
"AppliedFunctions";
+ public static final String FIELD_NAME_WHERE_CLAUSE = "WhereClause";
//---------------------------------- Record Types Creation
----------------------------------//
//--------------------------------------- Properties
----------------------------------------//
@@ -378,7 +379,8 @@
public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2;
public static final int FEED_CONN_OUTPUT_TYPE_INDEX = 3;
public static final int FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX = 4;
- public static final int FEED_CONN_POLICY_FIELD_INDEX = 5;
+ public static final int FEED_CONN_WHERE_CLAUSE_BODY_INDEX = 5;
+ public static final int FEED_CONN_POLICY_FIELD_INDEX = 6;
public static final ARecordType FEED_CONNECTION_RECORDTYPE =
createRecordType(
@@ -386,10 +388,11 @@
RECORD_NAME_FEED_CONNECTION,
// FieldNames
new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME,
FIELD_NAME_DATASET_NAME,
- FIELD_NAME_RETURN_TYPE, FIELD_NAME_APPLIED_FUNCTIONS,
FIELD_NAME_POLICY_NAME },
+ FIELD_NAME_RETURN_TYPE, FIELD_NAME_APPLIED_FUNCTIONS,
FIELD_NAME_WHERE_CLAUSE,
+ FIELD_NAME_POLICY_NAME },
// FieldTypes
new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
BuiltinType.ASTRING, BuiltinType.ASTRING,
- new AUnorderedListType(BuiltinType.ASTRING, null),
BuiltinType.ASTRING},
+ new AUnorderedListType(BuiltinType.ASTRING, null),
BuiltinType.ASTRING, BuiltinType.ASTRING },
//IsOpen?
true);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 8e14ee5..5745eb5 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -40,17 +40,19 @@
private String feedName;
private String datasetName;
private String policyName;
+ private String whereClauseBody;
private String outputType;
private List<FunctionSignature> appliedFunctions;
public FeedConnection(String dataverseName, String feedName, String
datasetName,
- List<FunctionSignature> appliedFunctions, String policyName,
String outputType) {
+ List<FunctionSignature> appliedFunctions, String policyName,
String whereClauseBody, String outputType) {
this.dataverseName = dataverseName;
this.feedName = feedName;
this.datasetName = datasetName;
this.appliedFunctions = appliedFunctions;
this.connectionId = feedName + ":" + datasetName;
this.policyName = policyName;
+ this.whereClauseBody = whereClauseBody;
this.outputType = outputType;
this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME,
dataverseName, feedName);
}
@@ -105,6 +107,10 @@
return policyName;
}
+ public String getWhereClauseBody() {
+ return whereClauseBody;
+ }
+
public String getOutputType() {
return outputType;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
index 269497b..7adcd1d 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
@@ -83,6 +83,8 @@
.getStringValue();
String outputType = ((AString)
feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_OUTPUT_TYPE_INDEX))
.getStringValue();
+ String whereClauseBody = ((AString) feedConnRecord
+
.getValueByPos(MetadataRecordTypes.FEED_CONN_WHERE_CLAUSE_BODY_INDEX)).getStringValue();
String policyName = ((AString)
feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX))
.getStringValue();
ArrayList<FunctionSignature> appliedFunctions = null;
@@ -101,7 +103,8 @@
}
}
- return new FeedConnection(dataverseName, feedName, datasetName,
appliedFunctions, policyName, outputType);
+ return new FeedConnection(dataverseName, feedName, datasetName,
appliedFunctions, policyName, whereClauseBody,
+ outputType);
}
@Override
@@ -153,6 +156,12 @@
fieldValue.reset();
writeAppliedFunctionsField(recordBuilder, me, fieldValue);
+ // field: whereClauseBody
+ fieldValue.reset();
+ aString.setValue(me.getWhereClauseBody());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+
recordBuilder.addField(MetadataRecordTypes.FEED_CONN_WHERE_CLAUSE_BODY_INDEX,
fieldValue);
+
// field: policyName
fieldValue.reset();
aString.setValue(me.getPolicyName());
--
To view, visit https://asterix-gerrit.ics.uci.edu/2255
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>