Steven Jacobs has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1227
Change subject: Updated to match code changes to asterix ...................................................................... Updated to match code changes to asterix Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a --- M src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java M src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java M src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java 5 files changed, 40 insertions(+), 20 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/27/1227/1 diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java index 7d0cb1a..21a3ef0 100644 --- a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java +++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java @@ -29,16 +29,22 @@ import org.apache.asterix.bad.metadata.Channel; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.lang.aql.expression.FLWOGRExpression; +import org.apache.asterix.lang.common.base.Clause; import org.apache.asterix.lang.common.base.Expression; +import org.apache.asterix.lang.common.clause.LetClause; import org.apache.asterix.lang.common.expression.CallExpr; +import org.apache.asterix.lang.common.expression.FieldAccessor; import org.apache.asterix.lang.common.expression.FieldBinding; import org.apache.asterix.lang.common.expression.LiteralExpr; import org.apache.asterix.lang.common.expression.RecordConstructor; +import org.apache.asterix.lang.common.expression.VariableExpr; import org.apache.asterix.lang.common.literal.StringLiteral; import org.apache.asterix.lang.common.statement.InsertStatement; import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.statement.UpsertStatement; import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.struct.VarIdentifier; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; @@ -182,18 +188,32 @@ subscriptionTuple.setVarCounter(varCounter); if (subscriptionId == null) { - List<String> returnField = new ArrayList<>(); - returnField.add(BADConstants.SubscriptionId); + + VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1)); + VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1)); + VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0)); + VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0)); + useResultVar.setIsNewVar(false); + useSubscriptionVar.setIsNewVar(false); + Query returnQuery = new Query(false); + List<Clause> clauseList = new ArrayList<>(); + LetClause let = new LetClause(subscriptionVar, + new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId))); + clauseList.add(let); + FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar); + returnQuery.setBody(body); + metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); metadataProvider.setResultAsyncMode( resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED); InsertStatement insert = new InsertStatement(new Identifier(dataverse), - new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, false, returnField); + new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, + returnQuery); ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc, resultDelivery, stats, false); } else { UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse), - new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter); + new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null); ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc, resultDelivery, stats, false); } diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 824e725..9c99a9f 100644 --- a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -67,7 +67,7 @@ import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.om.base.temporal.ADurationParserFactory; import org.apache.asterix.runtime.util.AsterixAppContextInfo; -import org.apache.asterix.runtime.util.AsterixClusterProperties; +import org.apache.asterix.runtime.util.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; @@ -199,7 +199,7 @@ RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse, channelName, duration, channeljobSpec, strIP, port); - String partition = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0]; + String partition = ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0]; Set<String> ncs = new HashSet<>(Arrays.asList(partition)); AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint( ncs.toArray(new String[ncs.size()])); @@ -246,7 +246,7 @@ IHyracksDataset hdc, Stats stats, String dataverse) throws Exception { StringBuilder builder = new StringBuilder(); builder.append("insert into dataset " + dataverse + "." + resultsName + " "); - builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n"); + builder.append(" as $a (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n"); builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n"); builder.append( @@ -266,7 +266,7 @@ builder.append("\"result\":$result"); builder.append("}"); builder.append(")"); - builder.append(" return records"); + builder.append(" returning $a"); builder.append(";"); AQLParserFactory aqlFact = new AQLParserFactory(); List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse(); diff --git a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java index 8e19fc0..374bae2 100644 --- a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java +++ b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java @@ -51,7 +51,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; @@ -75,10 +75,10 @@ return false; } AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); - if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) { + if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) { return false; } - ExtensionOperator eOp = (ExtensionOperator) op; + DelegateOperator eOp = (DelegateOperator) op; if (!(eOp.getDelegate() instanceof CommitOperator)) { return false; } @@ -140,7 +140,7 @@ context.computeAndSetTypeEnvironmentForOperator(badProject); //Create my brokerNotify plan above the extension Operator - ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, + DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName); opRef.setValue(dOp); @@ -148,7 +148,7 @@ return true; } - private ExtensionOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar, + private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName) throws AlgebricksException { @@ -196,7 +196,7 @@ EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName); NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId); notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp); - ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp); + DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp); extensionOp.setPhysicalOperator(notifyBrokerPOp); extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp)); diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java index c680988..d281b49 100644 --- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java +++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java @@ -22,14 +22,14 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; /** * A repetitive channel operator, which uses a Java timer to run a given query periodically */ -public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator { +public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { private final LogicalVariable subscriptionIdVar; private final LogicalVariable brokerEndpointVar; private final LogicalVariable channelExecutionVar; @@ -64,7 +64,7 @@ } @Override - public IOperatorExtension newInstance() { + public IOperatorDelegate newInstance() { return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar); } diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java index 753ece7..7d0e044 100644 --- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java +++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java @@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; @@ -73,7 +73,7 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { - ExtensionOperator notify = (ExtensionOperator) op; + DelegateOperator notify = (DelegateOperator) op; LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable(); LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable(); LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable(); -- To view, visit https://asterix-gerrit.ics.uci.edu/1227 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a Gerrit-PatchSet: 1 Gerrit-Project: asterixdb-bad Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>