Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1227

Change subject: Updated to match code changes to asterix
......................................................................

Updated to match code changes to asterix

Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
---
M 
src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M 
src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M 
src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
5 files changed, 40 insertions(+), 20 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad 
refs/changes/27/1227/1

diff --git 
a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
 
b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7d0cb1a..21a3ef0 100644
--- 
a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ 
b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -29,16 +29,22 @@
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
+import org.apache.asterix.lang.common.base.Clause;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
 import org.apache.asterix.lang.common.expression.FieldBinding;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.UpsertStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -182,18 +188,32 @@
             subscriptionTuple.setVarCounter(varCounter);
 
             if (subscriptionId == null) {
-                List<String> returnField = new ArrayList<>();
-                returnField.add(BADConstants.SubscriptionId);
+
+                VariableExpr subscriptionVar = new VariableExpr(new 
VarIdentifier("$sub", 1));
+                VariableExpr useSubscriptionVar = new VariableExpr(new 
VarIdentifier("$sub", 1));
+                VariableExpr resultVar = new VariableExpr(new 
VarIdentifier("$result", 0));
+                VariableExpr useResultVar = new VariableExpr(new 
VarIdentifier("$result", 0));
+                useResultVar.setIsNewVar(false);
+                useSubscriptionVar.setIsNewVar(false);
+                Query returnQuery = new Query(false);
+                List<Clause> clauseList = new ArrayList<>();
+                LetClause let = new LetClause(subscriptionVar,
+                        new FieldAccessor(useResultVar, new 
Identifier(BADConstants.SubscriptionId)));
+                clauseList.add(let);
+                FLWOGRExpression body = new FLWOGRExpression(clauseList, 
useSubscriptionVar);
+                returnQuery.setBody(body);
+
                 metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
                 metadataProvider.setResultAsyncMode(
                         resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.ASYNC_DEFERRED);
                 InsertStatement insert = new InsertStatement(new 
Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), 
subscriptionTuple, varCounter, false, returnField);
+                        new Identifier(subscriptionsDatasetName), 
subscriptionTuple, varCounter, resultVar,
+                        returnQuery);
                 ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, 
hdc,
                         resultDelivery, stats, false);
             } else {
                 UpsertStatement upsert = new UpsertStatement(new 
Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), 
subscriptionTuple, varCounter);
+                        new Identifier(subscriptionsDatasetName), 
subscriptionTuple, varCounter, null, null);
                 ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, 
hdc,
                         resultDelivery, stats, false);
             }
diff --git 
a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 824e725..9c99a9f 100644
--- 
a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -67,7 +67,7 @@
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -199,7 +199,7 @@
         RepetitiveChannelOperatorDescriptor channelOp = new 
RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
                 channelName, duration, channeljobSpec, strIP, port);
 
-        String partition = 
AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
+        String partition = 
ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
         Set<String> ncs = new HashSet<>(Arrays.asList(partition));
         AlgebricksAbsolutePartitionConstraint partitionConstraint = new 
AlgebricksAbsolutePartitionConstraint(
                 ncs.toArray(new String[ncs.size()]));
@@ -246,7 +246,7 @@
             IHyracksDataset hdc, Stats stats, String dataverse) throws 
Exception {
         StringBuilder builder = new StringBuilder();
         builder.append("insert into dataset " + dataverse + "." + resultsName 
+ " ");
-        builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " 
:= current-datetime() \n");
+        builder.append(" as $a (" + " let $" + 
BADConstants.ChannelExecutionTime + " := current-datetime() \n");
 
         builder.append("for $sub in dataset " + dataverse + "." + 
subscriptionsName + "\n");
         builder.append(
@@ -266,7 +266,7 @@
         builder.append("\"result\":$result");
         builder.append("}");
         builder.append(")");
-        builder.append(" return records");
+        builder.append(" returning $a");
         builder.append(";");
         AQLParserFactory aqlFact = new AQLParserFactory();
         List<Statement> fStatements = aqlFact.createParser(new 
StringReader(builder.toString())).parse();
diff --git 
a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
 
b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 8e19fc0..374bae2 100644
--- 
a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ 
b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -51,7 +51,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -75,10 +75,10 @@
             return false;
         }
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        ExtensionOperator eOp = (ExtensionOperator) op;
+        DelegateOperator eOp = (DelegateOperator) op;
         if (!(eOp.getDelegate() instanceof CommitOperator)) {
             return false;
         }
@@ -140,7 +140,7 @@
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
         //Create my brokerNotify plan above the extension Operator
-        ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, 
subscriptionIdVar, channelExecutionVar,
+        DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, 
subscriptionIdVar, channelExecutionVar,
                 context, eOp, (DistributeResultOperator) op1, 
channelDataverse, channelName);
 
         opRef.setValue(dOp);
@@ -148,7 +148,7 @@
         return true;
     }
 
-    private ExtensionOperator createNotifyBrokerPlan(LogicalVariable 
brokerEndpointVar,
+    private DelegateOperator createNotifyBrokerPlan(LogicalVariable 
brokerEndpointVar,
             LogicalVariable subscriptionIdVar, LogicalVariable 
channelExecutionVar, IOptimizationContext context,
             ILogicalOperator eOp, DistributeResultOperator distributeOp, 
String channelDataverse, String channelName)
                     throws AlgebricksException {
@@ -196,7 +196,7 @@
         EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
channelDataverse, channelName);
         NotifyBrokerPOperator notifyBrokerPOp = new 
NotifyBrokerPOperator(activeId);
         notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
-        ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp);
+        DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
         extensionOp.setPhysicalOperator(notifyBrokerPOp);
         extensionOp.getInputs().add(new 
MutableObject<ILogicalOperator>(groupbyOp));
 
diff --git 
a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java 
b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index c680988..d281b49 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -22,14 +22,14 @@
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
 /**
  * A repetitive channel operator, which uses a Java timer to run a given query 
periodically
  */
-public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
     private final LogicalVariable subscriptionIdVar;
     private final LogicalVariable brokerEndpointVar;
     private final LogicalVariable channelExecutionVar;
@@ -64,7 +64,7 @@
     }
 
     @Override
-    public IOperatorExtension newInstance() {
+    public IOperatorDelegate newInstance() {
         return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, 
channelExecutionVar);
     }
 
diff --git 
a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java 
b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 753ece7..7d0e044 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -73,7 +73,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
                     throws AlgebricksException {
-        ExtensionOperator notify = (ExtensionOperator) op;
+        DelegateOperator notify = (DelegateOperator) op;
         LogicalVariable subVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getSubscriptionVariable();
         LogicalVariable brokerVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getBrokerEndpointVariable();
         LogicalVariable executionVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getChannelExecutionVariable();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1227
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>

Reply via email to