Xikui Wang has submitted this change and it was merged. Change subject: [ASTERIXDB-2227][ING] Enabling filitering incoming data in feed ......................................................................
[ASTERIXDB-2227][ING] Enabling filitering incoming data in feed - user model changes: yes Add syntax support for specifying predicate in connect feed - storage format changes: no - interface changes: no Details: In some use cases, a user may want to filter the incombing data with certain attributes. One example can be only store the incoming tweets with geo locations. This patch enables the <WHERE> clause in connect feed statement. User can subset the incoming data using following syntax: connect feed feeds.TweetFeed to dataset Tweets3 using policy `Basic` WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike'; Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2255 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java 14 files changed, 215 insertions(+), 10 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b64f828..aabb7c2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -2181,6 +2181,7 @@ String feedName = cfs.getFeedName(); String datasetName = cfs.getDatasetName().getValue(); String policyName = cfs.getPolicy(); + String whereClauseBody = cfs.getWhereClauseBody(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); // TODO: Check whether we are connecting a change feed to a non-meta dataset @@ -2213,7 +2214,7 @@ if (fc != null) { throw new AlgebricksException("Feed" + feedName + " is already connected dataset " + datasetName); } - fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, + fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody, outputType.getTypeName()); MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index b6371dc..424444a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -40,6 +40,7 @@ import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.transactions.TxnId; @@ -58,8 +59,11 @@ import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.lang.common.base.Expression; +import org.apache.asterix.lang.common.base.IParser; +import org.apache.asterix.lang.common.base.IParserFactory; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.clause.LetClause; +import org.apache.asterix.lang.common.clause.WhereClause; import org.apache.asterix.lang.common.expression.CallExpr; import org.apache.asterix.lang.common.expression.LiteralExpr; import org.apache.asterix.lang.common.expression.VariableExpr; @@ -78,6 +82,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectElement; import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory; import org.apache.asterix.lang.sqlpp.struct.SetOperationInput; import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -192,7 +197,7 @@ return argExprs; } - private static Query makeConnectionQuery(FeedConnection feedConnection) { + private static Query makeConnectionQuery(FeedConnection feedConnection) throws AlgebricksException { // Construct from clause VarIdentifier fromVarId = SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName()); VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId); @@ -204,6 +209,19 @@ CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList); FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null); FromClause fromClause = new FromClause(Arrays.asList(fromterm)); + WhereClause whereClause = null; + if (feedConnection.getWhereClauseBody().length() != 0) { + String whereClauseExpr = feedConnection.getWhereClauseBody() + ";"; + IParserFactory sqlppParserFactory = new SqlppParserFactory(); + IParser sqlppParser = sqlppParserFactory.createParser(whereClauseExpr); + List<Statement> stmts = sqlppParser.parse(); + if (stmts.size() != 1) { + throw new CompilationException("Exceptions happened in processing where clause."); + } + Query whereClauseQuery = (Query) stmts.get(0); + whereClause = new WhereClause(whereClauseQuery.getBody()); + } + // TODO: This can be the place to add select predicate for ingestion // Attaching functions int varIdx = 1; @@ -222,7 +240,7 @@ // Constructing select clause SelectElement selectElement = new SelectElement(previousVarExpr); SelectClause selectClause = new SelectClause(selectElement, null, false); - SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, null, null, null, null); + SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, whereClause, null, null, null); SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null); SelectExpression body = new SelectExpression(null, selectSetOperation, null, null, true); Query query = new Query(false, true, body, 0); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp new file mode 100644 index 0000000..412ce03 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse feeds if exists; +create dataverse feeds; +use feeds; + +create type feeds.TweetType as + closed { + id : string, + username : string, + location : string, + text : string, + timestamp : string +}; + +create dataset Tweets1(TweetType) primary key id; +create dataset Tweets2(TweetType) primary key id; +create dataset Tweets3(TweetType) primary key id; +create dataset Tweets4(TweetType) primary key id; + +create feed TweetFeed with { + "adapter-name" : "localfs", + "path":"asterix_nc1://data/twitter/obamatweets.adm", + "format":"adm", + "type-name":"TweetType", + "tuple-interval":"10" +}; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp new file mode 100644 index 0000000..dd83b35 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use feeds; +set `wait-for-completion-feed` `true`; +connect feed feeds.TweetFeed to dataset Tweets1 using policy `Basic` WHERE id = 'nc1:115'; +connect feed feeds.TweetFeed to dataset Tweets2 using policy `Basic` WHERE id LIKE 'nc1:11%'; +connect feed feeds.TweetFeed to dataset Tweets3 using policy `Basic` WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike'; +connect feed feeds.TweetFeed to dataset Tweets4 using policy `Basic` WHERE id LIKE 'nc1:11%' AND username = 'thewildpitch'; + +start feed feeds.TweetFeed; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp new file mode 100644 index 0000000..20c6320 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use feeds; + +select value x +from Tweets1 as x +union all +select value x2 +from Tweets2 as x2 +union all +select value x3 +from Tweets3 as x3 +union all +select value x4 +from Tweets4 as x4 +order by id; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm new file mode 100644 index 0000000..2c37364 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm @@ -0,0 +1,15 @@ +{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" } +{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" } +{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" } +{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ... #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" } +{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ... #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" } +{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" } +{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" } +{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" } +{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" } +{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" } +{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" } +{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" } +{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" } +{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" } +{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index bb0d473..96dbf01 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -8483,6 +8483,11 @@ </compilation-unit> </test-case> <test-case FilePath="feeds"> + <compilation-unit name="feeds_13"> + <output-dir compare="Text">feeds_13</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="feeds"> <compilation-unit name="issue_230_feeds"> <output-dir compare="Text">issue_230_feeds</output-dir> </compilation-unit> diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj index 02d2220..74fe907 100644 --- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj +++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj @@ -1166,7 +1166,7 @@ <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName() (ApplyFunction(appliedFunctions))? (policy = GetPolicy())? { - stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, policy, getVarCounter()); + stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, policy, null, getVarCounter()); } | <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> <DATASET> datasetNameComponents = QualifiedName() { diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java index 3b6a1c3..b0a3f6e 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java @@ -34,11 +34,12 @@ private final Identifier datasetName; private final String feedName; private final String policy; + private final String whereClauseBody; private int varCounter; private final List<FunctionSignature> appliedFunctions; public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, Pair<Identifier, Identifier> datasetNameCmp, - List<FunctionSignature> appliedFunctions, String policy, int varCounter) { + List<FunctionSignature> appliedFunctions, String policy, String whereClauseBody, int varCounter) { if (feedNameCmp.first != null && datasetNameCmp.first != null && !feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) { throw new IllegalArgumentException("Dataverse for source feed and target dataset do not match"); @@ -48,6 +49,7 @@ this.datasetName = datasetNameCmp.second; this.feedName = feedNameCmp.second.getValue(); this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName(); + this.whereClauseBody = whereClauseBody; this.varCounter = varCounter; this.appliedFunctions = appliedFunctions; } @@ -64,6 +66,10 @@ return varCounter; } + public String getWhereClauseBody() { + return whereClauseBody; + } + @Override public Kind getKind() { return Statement.Kind.CONNECT_FEED; diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index 7a99814..42b8d15 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -157,6 +157,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectRegular; import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation; import org.apache.asterix.lang.sqlpp.clause.UnnestClause; +import org.apache.asterix.lang.common.clause.WhereClause; import org.apache.asterix.lang.sqlpp.expression.CaseExpression; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; import org.apache.asterix.lang.sqlpp.optype.JoinType; @@ -1264,14 +1265,37 @@ List<FunctionSignature> appliedFunctions = new ArrayList<FunctionSignature>(); Statement stmt = null; String policy = null; + String whereClauseBody = null; + WhereClause whereClause = null; + Token beginPos = null; + Token endPos = null; } { ( <FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName() - (ApplyFunction(appliedFunctions))? (policy = GetPolicy())? + (ApplyFunction(appliedFunctions))? + (policy = GetPolicy())? + ( + <WHERE> + { + beginPos = token; + whereClause = new WhereClause(); + Expression whereExpr; + } + whereExpr = Expression() + { + whereClause.setWhereExpr(whereExpr); + } + )? + { + if (whereClause != null) { + endPos = token; + whereClauseBody = extractFragment(beginPos.endLine, beginPos.endColumn, endPos.endLine, endPos.endColumn + 1); + } + } { stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, - policy, getVarCounter()); + policy, whereClauseBody, getVarCounter()); } ) { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java index a1eb425..367f568 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java @@ -242,7 +242,8 @@ } public void dropFeedConnection(String dataverseName, String feedName, String datasetName) { - FeedConnection feedConnection = new FeedConnection(dataverseName, feedName, datasetName, null, null, null); + FeedConnection feedConnection = + new FeedConnection(dataverseName, feedName, datasetName, null, null, null, null); droppedCache.addFeedConnectionIfNotExists(feedConnection); logAndApply(new MetadataLogicalOperation(feedConnection, false)); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java index 54a69eb..ba1ea03 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java @@ -100,6 +100,7 @@ public static final String FIELD_NAME_VALUE = "Value"; public static final String FIELD_NAME_WORKING_MEMORY_SIZE = "WorkingMemorySize"; public static final String FIELD_NAME_APPLIED_FUNCTIONS = "AppliedFunctions"; + public static final String FIELD_NAME_WHERE_CLAUSE = "WhereClause"; //---------------------------------- Record Types Creation ----------------------------------// //--------------------------------------- Properties ----------------------------------------// diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java index 7572a9a..78d6e4e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java @@ -40,17 +40,19 @@ private String feedName; private String datasetName; private String policyName; + private String whereClauseBody; private String outputType; private List<FunctionSignature> appliedFunctions; public FeedConnection(String dataverseName, String feedName, String datasetName, - List<FunctionSignature> appliedFunctions, String policyName, String outputType) { + List<FunctionSignature> appliedFunctions, String policyName, String whereClauseBody, String outputType) { this.dataverseName = dataverseName; this.feedName = feedName; this.datasetName = datasetName; this.appliedFunctions = appliedFunctions; this.connectionId = feedName + ":" + datasetName; this.policyName = policyName; + this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody; this.outputType = outputType; this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName); } @@ -105,6 +107,10 @@ return policyName; } + public String getWhereClauseBody() { + return whereClauseBody; + } + public String getOutputType() { return outputType; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java index 269497b..61a8ab2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.util.ArrayList; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes; import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.om.base.AInt64; import org.apache.asterix.om.base.AMissing; import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.base.ARecord; @@ -52,6 +54,8 @@ public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2; public static final int FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX = 3; + + protected final transient ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage(); private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE .getSerializerDeserializer(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE); @@ -101,7 +105,12 @@ } } - return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, outputType); + int whereClauseIdx = feedConnRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE); + String whereClauseBody = + whereClauseIdx >= 0 ? ((AString) feedConnRecord.getValueByPos(whereClauseIdx)).getStringValue() : ""; + + return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody, + outputType); } @Override @@ -159,6 +168,9 @@ stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX, fieldValue); + // field: whereClauseBody + writeOpenPart(me); + recordBuilder.write(tupleBuilder.getDataOutput(), true); tupleBuilder.addFieldEndOffset(); @@ -166,6 +178,18 @@ return tuple; } + protected void writeOpenPart(FeedConnection fc) throws HyracksDataException { + if (fc.getWhereClauseBody() != null && fc.getWhereClauseBody().length() > 0) { + fieldName.reset(); + aString.setValue(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE); + stringSerde.serialize(aString, fieldName.getDataOutput()); + fieldValue.reset(); + aString.setValue(fc.getWhereClauseBody()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(fieldName, fieldValue); + } + } + private void writeAppliedFunctionsField(IARecordBuilder rb, FeedConnection fc, ArrayBackedValueStorage buffer) throws HyracksDataException { UnorderedListBuilder listBuilder = new UnorderedListBuilder(); -- To view, visit https://asterix-gerrit.ics.uci.edu/2255 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1 Gerrit-PatchSet: 19 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <xkk...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>