Xikui Wang has submitted this change and it was merged.

Change subject: [ASTERIXDB-2227][ING] Enabling filitering incoming data in feed
......................................................................


[ASTERIXDB-2227][ING] Enabling filitering incoming data in feed

- user model changes: yes
  Add syntax support for specifying predicate in connect feed
- storage format changes: no
- interface changes: no

Details:
In some use cases, a user may want to filter the incombing data with
certain attributes. One example can be only store the incoming tweets
with geo locations. This patch enables the <WHERE> clause in connect
feed statement. User can subset the incoming data using following
syntax:

 connect feed feeds.TweetFeed to  dataset Tweets3 using policy `Basic`
 WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike';

Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2255
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
14 files changed, 215 insertions(+), 10 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; No violations found; ; Verified



diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b64f828..aabb7c2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2181,6 +2181,7 @@
         String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
         String policyName = cfs.getPolicy();
+        String whereClauseBody = cfs.getWhereClauseBody();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         // TODO: Check whether we are connecting a change feed to a non-meta 
dataset
@@ -2213,7 +2214,7 @@
             if (fc != null) {
                 throw new AlgebricksException("Feed" + feedName + " is already 
connected dataset " + datasetName);
             }
-            fc = new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName,
+            fc = new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName, whereClauseBody,
                     outputType.getTypeName());
             
MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(),
 fc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index b6371dc..424444a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -40,6 +40,7 @@
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.TxnId;
@@ -58,8 +59,11 @@
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -78,6 +82,7 @@
 import org.apache.asterix.lang.sqlpp.clause.SelectElement;
 import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -192,7 +197,7 @@
         return argExprs;
     }
 
-    private static Query makeConnectionQuery(FeedConnection feedConnection) {
+    private static Query makeConnectionQuery(FeedConnection feedConnection) 
throws AlgebricksException {
         // Construct from clause
         VarIdentifier fromVarId = 
SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
         VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
@@ -204,6 +209,19 @@
         CallExpr datasrouceCallFunction = new CallExpr(new 
FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
         FromTerm fromterm = new FromTerm(datasrouceCallFunction, 
fromTermLeftExpr, null, null);
         FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+        WhereClause whereClause = null;
+        if (feedConnection.getWhereClauseBody().length() != 0) {
+            String whereClauseExpr = feedConnection.getWhereClauseBody() + ";";
+            IParserFactory sqlppParserFactory = new SqlppParserFactory();
+            IParser sqlppParser = 
sqlppParserFactory.createParser(whereClauseExpr);
+            List<Statement> stmts = sqlppParser.parse();
+            if (stmts.size() != 1) {
+                throw new CompilationException("Exceptions happened in 
processing where clause.");
+            }
+            Query whereClauseQuery = (Query) stmts.get(0);
+            whereClause = new WhereClause(whereClauseQuery.getBody());
+        }
+
         // TODO: This can be the place to add select predicate for ingestion
         // Attaching functions
         int varIdx = 1;
@@ -222,7 +240,7 @@
         // Constructing select clause
         SelectElement selectElement = new SelectElement(previousVarExpr);
         SelectClause selectClause = new SelectClause(selectElement, null, 
false);
-        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, 
letClauses, null, null, null, null);
+        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, 
letClauses, whereClause, null, null, null);
         SelectSetOperation selectSetOperation = new SelectSetOperation(new 
SetOperationInput(selectBlock, null), null);
         SelectExpression body = new SelectExpression(null, selectSetOperation, 
null, null, true);
         Query query = new Query(false, true, body, 0);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
new file mode 100644
index 0000000..412ce03
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse feeds if exists;
+create  dataverse feeds;
+use feeds;
+
+create type feeds.TweetType as
+ closed {
+  id : string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+};
+
+create dataset Tweets1(TweetType) primary key id;
+create dataset Tweets2(TweetType) primary key id;
+create dataset Tweets3(TweetType) primary key id;
+create dataset Tweets4(TweetType) primary key id;
+
+create feed TweetFeed with {
+  "adapter-name" : "localfs",
+  "path":"asterix_nc1://data/twitter/obamatweets.adm",
+  "format":"adm",
+  "type-name":"TweetType",
+  "tuple-interval":"10"
+};
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
new file mode 100644
index 0000000..dd83b35
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+set `wait-for-completion-feed` `true`;
+connect feed feeds.TweetFeed to  dataset Tweets1 using policy `Basic` WHERE id 
= 'nc1:115';
+connect feed feeds.TweetFeed to  dataset Tweets2 using policy `Basic` WHERE id 
LIKE 'nc1:11%';
+connect feed feeds.TweetFeed to  dataset Tweets3 using policy `Basic` WHERE id 
NOT LIKE 'nc1:10%' OR username = 'BronsonMike';
+connect feed feeds.TweetFeed to  dataset Tweets4 using policy `Basic` WHERE id 
LIKE 'nc1:11%' AND username = 'thewildpitch';
+
+start feed feeds.TweetFeed;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
new file mode 100644
index 0000000..20c6320
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+
+select value x
+from  Tweets1 as x
+union all
+select value x2
+from  Tweets2 as x2
+union all
+select value x3
+from  Tweets3 as x3
+union all
+select value x4
+from  Tweets4 as x4
+order by id;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
new file mode 100644
index 0000000..2c37364
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
@@ -0,0 +1,15 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": 
"@GottaLaff @reutersus Christie and obama just foul weather friends", 
"timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT 
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished 
http://t.co/WvzK0GtU";, "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT 
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished 
http://t.co/WvzK0GtU";, "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT 
@WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  
#Obama #WW3 http://t.co/0Wv2GKij";, "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT 
@WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  
#Obama #WW3 http://t.co/0Wv2GKij";, "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT 
@ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President 
Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 
06 16:53:12 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT 
@ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President 
Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 
06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT 
@RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful 
People http://t.co/Ihlemy9Y";, "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT 
@ConservativeQuo: unemployment is above 8% again. I wonder how long it will 
take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": 
"Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT 
@ConservativeQuo: unemployment is above 8% again. I wonder how long it will 
take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": 
"Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT 
@Newitrsdotcom: I hope #Obama will win re-election... Other four years without 
meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT 
@Newitrsdotcom: I hope #Obama will win re-election... Other four years without 
meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index bb0d473..96dbf01 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8483,6 +8483,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="feeds_13">
+        <output-dir compare="Text">feeds_13</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="issue_230_feeds">
         <output-dir compare="Text">issue_230_feeds</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj 
b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 02d2220..74fe907 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1166,7 +1166,7 @@
     <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> 
datasetNameComponents = QualifiedName()
     (ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
       {
-        stmt = new ConnectFeedStatement(feedNameComponents, 
datasetNameComponents, appliedFunctions, policy, getVarCounter());
+        stmt = new ConnectFeedStatement(feedNameComponents, 
datasetNameComponents, appliedFunctions, policy, null, getVarCounter());
       }
     | <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> 
<DATASET> datasetNameComponents = QualifiedName()
       {
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 3b6a1c3..b0a3f6e 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -34,11 +34,12 @@
     private final Identifier datasetName;
     private final String feedName;
     private final String policy;
+    private final String whereClauseBody;
     private int varCounter;
     private final List<FunctionSignature> appliedFunctions;
 
     public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, 
Pair<Identifier, Identifier> datasetNameCmp,
-            List<FunctionSignature> appliedFunctions, String policy, int 
varCounter) {
+            List<FunctionSignature> appliedFunctions, String policy, String 
whereClauseBody, int varCounter) {
         if (feedNameCmp.first != null && datasetNameCmp.first != null
                 && 
!feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) {
             throw new IllegalArgumentException("Dataverse for source feed and 
target dataset do not match");
@@ -48,6 +49,7 @@
         this.datasetName = datasetNameCmp.second;
         this.feedName = feedNameCmp.second.getValue();
         this.policy = policy != null ? policy : 
BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
+        this.whereClauseBody = whereClauseBody;
         this.varCounter = varCounter;
         this.appliedFunctions = appliedFunctions;
     }
@@ -64,6 +66,10 @@
         return varCounter;
     }
 
+    public String getWhereClauseBody() {
+        return whereClauseBody;
+    }
+
     @Override
     public Kind getKind() {
         return Statement.Kind.CONNECT_FEED;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj 
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 7a99814..42b8d15 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -157,6 +157,7 @@
 import org.apache.asterix.lang.sqlpp.clause.SelectRegular;
 import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.optype.JoinType;
@@ -1264,14 +1265,37 @@
   List<FunctionSignature> appliedFunctions = new 
ArrayList<FunctionSignature>();
   Statement stmt = null;
   String policy = null;
+  String whereClauseBody = null;
+  WhereClause whereClause = null;
+  Token beginPos = null;
+  Token endPos = null;
 }
 {
   (
     <FEED> feedNameComponents = QualifiedName() <TO> Dataset() 
datasetNameComponents = QualifiedName()
-    (ApplyFunction(appliedFunctions))?  (policy = GetPolicy())?
+    (ApplyFunction(appliedFunctions))?
+    (policy = GetPolicy())?
+    (
+      <WHERE>
+      {
+        beginPos = token;
+        whereClause = new WhereClause();
+        Expression whereExpr;
+      }
+      whereExpr = Expression()
+      {
+        whereClause.setWhereExpr(whereExpr);
+      }
+    )?
+    {
+      if (whereClause != null) {
+        endPos = token;
+        whereClauseBody = extractFragment(beginPos.endLine, 
beginPos.endColumn, endPos.endLine, endPos.endColumn + 1);
+      }
+    }
       {
         stmt = new ConnectFeedStatement(feedNameComponents, 
datasetNameComponents, appliedFunctions,
-         policy, getVarCounter());
+         policy, whereClauseBody, getVarCounter());
       }
   )
   {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index a1eb425..367f568 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -242,7 +242,8 @@
     }
 
     public void dropFeedConnection(String dataverseName, String feedName, 
String datasetName) {
-        FeedConnection feedConnection = new FeedConnection(dataverseName, 
feedName, datasetName, null, null, null);
+        FeedConnection feedConnection =
+                new FeedConnection(dataverseName, feedName, datasetName, null, 
null, null, null);
         droppedCache.addFeedConnectionIfNotExists(feedConnection);
         logAndApply(new MetadataLogicalOperation(feedConnection, false));
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 54a69eb..ba1ea03 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -100,6 +100,7 @@
     public static final String FIELD_NAME_VALUE = "Value";
     public static final String FIELD_NAME_WORKING_MEMORY_SIZE = 
"WorkingMemorySize";
     public static final String FIELD_NAME_APPLIED_FUNCTIONS = 
"AppliedFunctions";
+    public static final String FIELD_NAME_WHERE_CLAUSE = "WhereClause";
 
     //---------------------------------- Record Types Creation 
----------------------------------//
     //--------------------------------------- Properties 
----------------------------------------//
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 7572a9a..78d6e4e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -40,17 +40,19 @@
     private String feedName;
     private String datasetName;
     private String policyName;
+    private String whereClauseBody;
     private String outputType;
     private List<FunctionSignature> appliedFunctions;
 
     public FeedConnection(String dataverseName, String feedName, String 
datasetName,
-            List<FunctionSignature> appliedFunctions, String policyName, 
String outputType) {
+            List<FunctionSignature> appliedFunctions, String policyName, 
String whereClauseBody, String outputType) {
         this.dataverseName = dataverseName;
         this.feedName = feedName;
         this.datasetName = datasetName;
         this.appliedFunctions = appliedFunctions;
         this.connectionId = feedName + ":" + datasetName;
         this.policyName = policyName;
+        this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody;
         this.outputType = outputType;
         this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, 
dataverseName, feedName);
     }
@@ -105,6 +107,10 @@
         return policyName;
     }
 
+    public String getWhereClauseBody() {
+        return whereClauseBody;
+    }
+
     public String getOutputType() {
         return outputType;
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
index 269497b..61a8ab2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
@@ -22,6 +22,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -32,6 +33,7 @@
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.ARecord;
@@ -52,6 +54,8 @@
     public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2;
 
     public static final int FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+    protected final transient ArrayBackedValueStorage fieldName = new 
ArrayBackedValueStorage();
 
     private ISerializerDeserializer<ARecord> recordSerDes = 
SerializerDeserializerProvider.INSTANCE
             
.getSerializerDeserializer(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE);
@@ -101,7 +105,12 @@
             }
         }
 
-        return new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName, outputType);
+        int whereClauseIdx = 
feedConnRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE);
+        String whereClauseBody =
+                whereClauseIdx >= 0 ? ((AString) 
feedConnRecord.getValueByPos(whereClauseIdx)).getStringValue() : "";
+
+        return new FeedConnection(dataverseName, feedName, datasetName, 
appliedFunctions, policyName, whereClauseBody,
+                outputType);
     }
 
     @Override
@@ -159,6 +168,9 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         
recordBuilder.addField(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX, 
fieldValue);
 
+        // field: whereClauseBody
+        writeOpenPart(me);
+
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();
 
@@ -166,6 +178,18 @@
         return tuple;
     }
 
+    protected void writeOpenPart(FeedConnection fc) throws 
HyracksDataException {
+        if (fc.getWhereClauseBody() != null && 
fc.getWhereClauseBody().length() > 0) {
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aString.setValue(fc.getWhereClauseBody());
+            stringSerde.serialize(aString, fieldValue.getDataOutput());
+            recordBuilder.addField(fieldName, fieldValue);
+        }
+    }
+
     private void writeAppliedFunctionsField(IARecordBuilder rb, FeedConnection 
fc, ArrayBackedValueStorage buffer)
             throws HyracksDataException {
         UnorderedListBuilder listBuilder = new UnorderedListBuilder();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2255
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
Gerrit-PatchSet: 19
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xkk...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to