Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2255

Change subject: [WIP] Add filter query to feed connection
......................................................................

[WIP] Add filter query to feed connection

Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
15 files changed, 204 insertions(+), 14 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/55/2255/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index de0c88f..29ca171 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2191,6 +2191,7 @@
         String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
         String policyName = cfs.getPolicy();
+        String whereClauseBody = cfs.getWhereClauseBody();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         // TODO: Check whether we are connecting a change feed to a non-meta 
dataset
@@ -2223,7 +2224,7 @@
             if (fc != null) {
                 throw new AlgebricksException("Feed" + feedName + " is already 
connected dataset " + datasetName);
             }
-            fc = new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName,
+            fc = new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName, whereClauseBody,
                     outputType.getTypeName());
             
MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(),
 fc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index ac249b7..23ca8c0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -40,6 +40,7 @@
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.TxnId;
@@ -58,8 +59,11 @@
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -78,6 +82,7 @@
 import org.apache.asterix.lang.sqlpp.clause.SelectElement;
 import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -192,7 +197,7 @@
         return argExprs;
     }
 
-    private static Query makeConnectionQuery(FeedConnection feedConnection) {
+    private static Query makeConnectionQuery(FeedConnection feedConnection) 
throws AlgebricksException {
         // Construct from clause
         VarIdentifier fromVarId = 
SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
         VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
@@ -205,6 +210,19 @@
         CallExpr datasrouceCallFunction = new CallExpr(new 
FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
         FromTerm fromterm = new FromTerm(datasrouceCallFunction, 
fromTermLeftExpr, null, null);
         FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+        WhereClause whereClause = null;
+        if(feedConnection.getWhereClauseBody() != null) {
+            String whereClauseExpr = feedConnection.getWhereClauseBody() + ";";
+            IParserFactory sqlppParserFactory = new SqlppParserFactory();
+            IParser sqlppParser = 
sqlppParserFactory.createParser(whereClauseExpr);
+            List<Statement> stmts = sqlppParser.parse();
+            if (stmts.size() != 1) {
+                throw new CompilationException("Exceptions happened in 
processing where clause.");
+            }
+            Query whereClauseQuery = (Query) stmts.get(0);
+            whereClause = new WhereClause(whereClauseQuery.getBody());
+        }
+
         // TODO: This can be the place to add select predicate for ingestion
         // Attaching functions
         int varIdx = 1;
@@ -223,7 +241,7 @@
         // Constructing select clause
         SelectElement selectElement = new SelectElement(previousVarExpr);
         SelectClause selectClause = new SelectClause(selectElement, null, 
false);
-        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, 
letClauses, null, null, null, null);
+        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, 
letClauses, whereClause, null, null, null);
         SelectSetOperation selectSetOperation = new SelectSetOperation(new 
SetOperationInput(selectBlock, null), null);
         SelectExpression body = new SelectExpression(null, selectSetOperation, 
null, null, true);
         Query query = new Query(false, true, body, 0);
diff --git 
a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
 
b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
index 7229aa8..9be7701 100644
--- 
a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_datatype/metadata_datatype.1.adm
@@ -21,7 +21,7 @@
 { "DataverseName": "Metadata", "DatatypeName": 
"DatatypeRecordType_Derived_Record_Fields_Item", "Derived": { "Tag": "RECORD", 
"IsAnonymous": true, "Record": { "IsOpen": true, "Fields": [ { "FieldName": 
"FieldName", "FieldType": "string", "IsNullable": false }, { "FieldName": 
"FieldType", "FieldType": "string", "IsNullable": false }, { "FieldName": 
"IsNullable", "FieldType": "boolean", "IsNullable": false } ] } }, "Timestamp": 
"Fri Oct 21 10:29:21 PDT 2016" }
 { "DataverseName": "Metadata", "DatatypeName": "DataverseRecordType", 
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, 
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "DataFormat", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "Timestamp", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32", 
"IsNullable": false } ] } }, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016" }
 { "DataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType", 
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, 
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "FileNumber", "FieldType": "int32", 
"IsNullable": false }, { "FieldName": "FileName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "FileSize", "FieldType": "int64", 
"IsNullable": false }, { "FieldName": "FileModTime", "FieldType": "datetime", 
"IsNullable": false }, { "FieldName": "PendingOp", "FieldType": "int32", 
"IsNullable": false } ] } }, "Timestamp": "Fri Oct 21 10:29:22 PDT 2016" }
-{ "DataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType", 
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, 
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "FeedName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "ReturnType", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "AppliedFunctions", "FieldType": 
"FeedConnectionRecordType_AppliedFunctions", "IsNullable": false }, { 
"FieldName": "PolicyName", "FieldType": "string", "IsNullable": false } ] } }, 
"Timestamp": "Fri Oct 21 10:29:22 PDT 2016" }
+{ "DataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType", 
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, 
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "FeedName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "DatasetName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "ReturnType", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "AppliedFunctions", "FieldType": 
"FeedConnectionRecordType_AppliedFunctions", "IsNullable": false }, { 
"FieldName": "WhereClause", "FieldType": "string", "IsNullable": false }, { 
"FieldName": "PolicyName", "FieldType": "string", "IsNullable": false } ] } }, 
"Timestamp": "Tue Jan 02 14:03:26 PST 2018" }
 { "DataverseName": "Metadata", "DatatypeName": 
"FeedConnectionRecordType_AppliedFunctions", "Derived": { "Tag": 
"UNORDEREDLIST", "IsAnonymous": true, "UnorderedList": "string" }, "Timestamp": 
"Fri Oct 21 10:29:22 PDT 2016" }
 { "DataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType", 
"Derived": { "Tag": "RECORD", "IsAnonymous": false, "Record": { "IsOpen": true, 
"Fields": [ { "FieldName": "DataverseName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "PolicyName", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "Description", "FieldType": "string", 
"IsNullable": false }, { "FieldName": "Properties", "FieldType": 
"FeedPolicyRecordType_Properties", "IsNullable": false } ] } }, "Timestamp": 
"Fri Oct 21 10:29:22 PDT 2016" }
 { "DataverseName": "Metadata", "DatatypeName": 
"FeedPolicyRecordType_Properties", "Derived": { "Tag": "UNORDEREDLIST", 
"IsAnonymous": true, "UnorderedList": "FeedPolicyRecordType_Properties_Item" }, 
"Timestamp": "Fri Oct 21 10:29:22 PDT 2016" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
new file mode 100644
index 0000000..a4314c9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse feeds if exists;
+create  dataverse feeds;
+use feeds;
+
+create type feeds.TweetType as
+ closed {
+  id : string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+};
+
+create dataset Tweets1(TweetType) primary key id;
+create dataset Tweets2(TweetType) primary key id;
+create dataset Tweets3(TweetType) primary key id;
+create dataset Tweets4(TweetType) primary key id;
+
+create feed TweetFeed using localfs
+((`path`=`asterix_nc1://data/twitter/obamatweets.adm`),
+(`format`=`adm`),
+(`type-name`=`TweetType`),
+(`tuple-interval`=`10`));
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
new file mode 100644
index 0000000..dd83b35
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+set `wait-for-completion-feed` `true`;
+connect feed feeds.TweetFeed to  dataset Tweets1 using policy `Basic` WHERE id 
= 'nc1:115';
+connect feed feeds.TweetFeed to  dataset Tweets2 using policy `Basic` WHERE id 
LIKE 'nc1:11%';
+connect feed feeds.TweetFeed to  dataset Tweets3 using policy `Basic` WHERE id 
NOT LIKE 'nc1:10%' OR username = 'BronsonMike';
+connect feed feeds.TweetFeed to  dataset Tweets4 using policy `Basic` WHERE id 
LIKE 'nc1:11%' AND username = 'thewildpitch';
+
+start feed feeds.TweetFeed;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
new file mode 100644
index 0000000..20c6320
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+
+select value x
+from  Tweets1 as x
+union all
+select value x2
+from  Tweets2 as x2
+union all
+select value x3
+from  Tweets3 as x3
+union all
+select value x4
+from  Tweets4 as x4
+order by id;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
new file mode 100644
index 0000000..2c37364
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
@@ -0,0 +1,15 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": 
"@GottaLaff @reutersus Christie and obama just foul weather friends", 
"timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT 
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished 
http://t.co/WvzK0GtU";, "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT 
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished 
http://t.co/WvzK0GtU";, "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT 
@WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  
#Obama #WW3 http://t.co/0Wv2GKij";, "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT 
@WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  
#Obama #WW3 http://t.co/0Wv2GKij";, "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT 
@ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President 
Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 
06 16:53:12 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT 
@ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President 
Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 
06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT 
@ConservativeQuo: unemployment is above 8% again. I wonder how long it will 
take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": 
"Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT 
@ConservativeQuo: unemployment is above 8% again. I wonder how long it will 
take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": 
"Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT 
@Newitrsdotcom: I hope #Obama will win re-election... Other four years without 
meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT 
@Newitrsdotcom: I hope #Obama will win re-election... Other four years without 
meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2c3d855..05425ff 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8335,6 +8335,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="feeds_13">
+        <output-dir compare="Text">feeds_13</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="issue_230_feeds">
         <output-dir compare="Text">issue_230_feeds</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj 
b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index d55db87..b384119 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1186,7 +1186,7 @@
     <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> 
datasetNameComponents = QualifiedName()
     (ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
       {
-        stmt = new ConnectFeedStatement(feedNameComponents, 
datasetNameComponents, appliedFunctions, policy, getVarCounter());
+        stmt = new ConnectFeedStatement(feedNameComponents, 
datasetNameComponents, appliedFunctions, policy, null, getVarCounter());
       }
     | <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> 
<DATASET> datasetNameComponents = QualifiedName()
       {
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 3d8fc68..95f8f47 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -34,11 +34,12 @@
     private final Identifier datasetName;
     private final String feedName;
     private final String policy;
+    private final String whereClauseBody;
     private int varCounter;
     private final List<FunctionSignature> appliedFunctions;
 
     public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, 
Pair<Identifier, Identifier> datasetNameCmp,
-            List<FunctionSignature> appliedFunctions, String policy, int 
varCounter) {
+            List<FunctionSignature> appliedFunctions, String policy, String 
whereClauseBody, int varCounter) {
         if (feedNameCmp.first != null && datasetNameCmp.first != null
                 && 
!feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) {
             throw new IllegalArgumentException("Dataverse for source feed and 
target dataset do not match");
@@ -48,6 +49,7 @@
         this.datasetName = datasetNameCmp.second;
         this.feedName = feedNameCmp.second.getValue();
         this.policy = policy != null ? policy : 
BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
+        this.whereClauseBody = whereClauseBody;
         this.varCounter = varCounter;
         this.appliedFunctions = appliedFunctions;
     }
@@ -64,6 +66,10 @@
         return varCounter;
     }
 
+    public String getWhereClauseBody() {
+        return whereClauseBody;
+    }
+
     @Override
     public byte getKind() {
         return Statement.Kind.CONNECT_FEED;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj 
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index b68ffd6..81c97a5 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -158,6 +158,7 @@
 import org.apache.asterix.lang.sqlpp.clause.SelectRegular;
 import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.optype.JoinType;
@@ -1281,14 +1282,37 @@
   List<FunctionSignature> appliedFunctions = new 
ArrayList<FunctionSignature>();
   Statement stmt = null;
   String policy = null;
+  String whereClauseBody = null;
+  WhereClause whereClause = null;
+  Token beginPos = null;
+  Token endPos = null;
 }
 {
   (
     <FEED> feedNameComponents = QualifiedName() <TO> Dataset() 
datasetNameComponents = QualifiedName()
-    (ApplyFunction(appliedFunctions))?  (policy = GetPolicy())?
+    (ApplyFunction(appliedFunctions))?
+    (policy = GetPolicy())?
+    (
+      <WHERE>
+      {
+        beginPos = token;
+        whereClause = new WhereClause();
+        Expression whereExpr;
+      }
+      whereExpr = Expression()
+      {
+        whereClause.setWhereExpr(whereExpr);
+      }
+    )?
+    {
+      if (whereClause != null) {
+        endPos = token;
+        whereClauseBody = extractFragment(beginPos.endLine, 
beginPos.endColumn, endPos.endLine, endPos.endColumn + 1);
+      }
+    }
       {
         stmt = new ConnectFeedStatement(feedNameComponents, 
datasetNameComponents, appliedFunctions,
-         policy, getVarCounter());
+         policy, whereClauseBody, getVarCounter());
       }
   )
   {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 406b3d6..1f18bc9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -245,7 +245,8 @@
     }
 
     public void dropFeedConnection(String dataverseName, String feedName, 
String datasetName) {
-        FeedConnection feedConnection = new FeedConnection(dataverseName, 
feedName, datasetName, null, null, null);
+        FeedConnection feedConnection = new FeedConnection(dataverseName, 
feedName, datasetName, null, null, null,
+                null);
         droppedCache.addFeedConnectionIfNotExists(feedConnection);
         logAndApply(new MetadataLogicalOperation(feedConnection, false));
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 2a04b58..6b7532b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -99,6 +99,7 @@
     public static final String FIELD_NAME_VALUE = "Value";
     public static final String FIELD_NAME_WORKING_MEMORY_SIZE = 
"WorkingMemorySize";
     public static final String FIELD_NAME_APPLIED_FUNCTIONS = 
"AppliedFunctions";
+    public static final String FIELD_NAME_WHERE_CLAUSE = "WhereClause";
 
     //---------------------------------- Record Types Creation 
----------------------------------//
     //--------------------------------------- Properties 
----------------------------------------//
@@ -378,7 +379,8 @@
     public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2;
     public static final int FEED_CONN_OUTPUT_TYPE_INDEX = 3;
     public static final int FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX = 4;
-    public static final int FEED_CONN_POLICY_FIELD_INDEX = 5;
+    public static final int FEED_CONN_WHERE_CLAUSE_BODY_INDEX = 5;
+    public static final int FEED_CONN_POLICY_FIELD_INDEX = 6;
 
 
     public static final ARecordType FEED_CONNECTION_RECORDTYPE = 
createRecordType(
@@ -386,10 +388,11 @@
             RECORD_NAME_FEED_CONNECTION,
             // FieldNames
             new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, 
FIELD_NAME_DATASET_NAME,
-                    FIELD_NAME_RETURN_TYPE, FIELD_NAME_APPLIED_FUNCTIONS, 
FIELD_NAME_POLICY_NAME },
+                    FIELD_NAME_RETURN_TYPE, FIELD_NAME_APPLIED_FUNCTIONS, 
FIELD_NAME_WHERE_CLAUSE,
+                    FIELD_NAME_POLICY_NAME },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, 
BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    new AUnorderedListType(BuiltinType.ASTRING, null), 
BuiltinType.ASTRING},
+                    new AUnorderedListType(BuiltinType.ASTRING, null), 
BuiltinType.ASTRING, BuiltinType.ASTRING },
             //IsOpen?
             true);
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 8e14ee5..5745eb5 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -40,17 +40,19 @@
     private String feedName;
     private String datasetName;
     private String policyName;
+    private String whereClauseBody;
     private String outputType;
     private List<FunctionSignature> appliedFunctions;
 
     public FeedConnection(String dataverseName, String feedName, String 
datasetName,
-            List<FunctionSignature> appliedFunctions, String policyName, 
String outputType) {
+            List<FunctionSignature> appliedFunctions, String policyName, 
String whereClauseBody, String outputType) {
         this.dataverseName = dataverseName;
         this.feedName = feedName;
         this.datasetName = datasetName;
         this.appliedFunctions = appliedFunctions;
         this.connectionId = feedName + ":" + datasetName;
         this.policyName = policyName;
+        this.whereClauseBody = whereClauseBody;
         this.outputType = outputType;
         this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, 
dataverseName, feedName);
     }
@@ -105,6 +107,10 @@
         return policyName;
     }
 
+    public String getWhereClauseBody() {
+        return whereClauseBody;
+    }
+
     public String getOutputType() {
         return outputType;
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
index 269497b..7adcd1d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
@@ -83,6 +83,8 @@
                         .getStringValue();
         String outputType = ((AString) 
feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_OUTPUT_TYPE_INDEX))
                 .getStringValue();
+        String whereClauseBody = ((AString) feedConnRecord
+                
.getValueByPos(MetadataRecordTypes.FEED_CONN_WHERE_CLAUSE_BODY_INDEX)).getStringValue();
         String policyName = ((AString) 
feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX))
                 .getStringValue();
         ArrayList<FunctionSignature> appliedFunctions = null;
@@ -101,7 +103,8 @@
             }
         }
 
-        return new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName, outputType);
+        return new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName, whereClauseBody,
+                outputType);
     }
 
     @Override
@@ -153,6 +156,12 @@
         fieldValue.reset();
         writeAppliedFunctionsField(recordBuilder, me, fieldValue);
 
+        // field: whereClauseBody
+        fieldValue.reset();
+        aString.setValue(me.getWhereClauseBody());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        
recordBuilder.addField(MetadataRecordTypes.FEED_CONN_WHERE_CLAUSE_BODY_INDEX, 
fieldValue);
+
         // field: policyName
         fieldValue.reset();
         aString.setValue(me.getPolicyName());

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2255
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to