Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1227
Change subject: Updated to match code changes to asterix
......................................................................
Updated to match code changes to asterix
Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
---
M
src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M
src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M
src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
M src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
5 files changed, 40 insertions(+), 20 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad
refs/changes/27/1227/1
diff --git
a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7d0cb1a..21a3ef0 100644
---
a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++
b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -29,16 +29,22 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
+import org.apache.asterix.lang.common.base.Clause;
import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.clause.LetClause;
import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldAccessor;
import org.apache.asterix.lang.common.expression.FieldBinding;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.UpsertStatement;
import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -182,18 +188,32 @@
subscriptionTuple.setVarCounter(varCounter);
if (subscriptionId == null) {
- List<String> returnField = new ArrayList<>();
- returnField.add(BADConstants.SubscriptionId);
+
+ VariableExpr subscriptionVar = new VariableExpr(new
VarIdentifier("$sub", 1));
+ VariableExpr useSubscriptionVar = new VariableExpr(new
VarIdentifier("$sub", 1));
+ VariableExpr resultVar = new VariableExpr(new
VarIdentifier("$result", 0));
+ VariableExpr useResultVar = new VariableExpr(new
VarIdentifier("$result", 0));
+ useResultVar.setIsNewVar(false);
+ useSubscriptionVar.setIsNewVar(false);
+ Query returnQuery = new Query(false);
+ List<Clause> clauseList = new ArrayList<>();
+ LetClause let = new LetClause(subscriptionVar,
+ new FieldAccessor(useResultVar, new
Identifier(BADConstants.SubscriptionId)));
+ clauseList.add(let);
+ FLWOGRExpression body = new FLWOGRExpression(clauseList,
useSubscriptionVar);
+ returnQuery.setBody(body);
+
metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC ||
resultDelivery == ResultDelivery.ASYNC_DEFERRED);
InsertStatement insert = new InsertStatement(new
Identifier(dataverse),
- new Identifier(subscriptionsDatasetName),
subscriptionTuple, varCounter, false, returnField);
+ new Identifier(subscriptionsDatasetName),
subscriptionTuple, varCounter, resultVar,
+ returnQuery);
((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc,
hdc,
resultDelivery, stats, false);
} else {
UpsertStatement upsert = new UpsertStatement(new
Identifier(dataverse),
- new Identifier(subscriptionsDatasetName),
subscriptionTuple, varCounter);
+ new Identifier(subscriptionsDatasetName),
subscriptionTuple, varCounter, null, null);
((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc,
hdc,
resultDelivery, stats, false);
}
diff --git
a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 824e725..9c99a9f 100644
---
a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++
b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -67,7 +67,7 @@
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -199,7 +199,7 @@
RepetitiveChannelOperatorDescriptor channelOp = new
RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
channelName, duration, channeljobSpec, strIP, port);
- String partition =
AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
+ String partition =
ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
Set<String> ncs = new HashSet<>(Arrays.asList(partition));
AlgebricksAbsolutePartitionConstraint partitionConstraint = new
AlgebricksAbsolutePartitionConstraint(
ncs.toArray(new String[ncs.size()]));
@@ -246,7 +246,7 @@
IHyracksDataset hdc, Stats stats, String dataverse) throws
Exception {
StringBuilder builder = new StringBuilder();
builder.append("insert into dataset " + dataverse + "." + resultsName
+ " ");
- builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + "
:= current-datetime() \n");
+ builder.append(" as $a (" + " let $" +
BADConstants.ChannelExecutionTime + " := current-datetime() \n");
builder.append("for $sub in dataset " + dataverse + "." +
subscriptionsName + "\n");
builder.append(
@@ -266,7 +266,7 @@
builder.append("\"result\":$result");
builder.append("}");
builder.append(")");
- builder.append(" return records");
+ builder.append(" returning $a");
builder.append(";");
AQLParserFactory aqlFact = new AQLParserFactory();
List<Statement> fStatements = aqlFact.createParser(new
StringReader(builder.toString())).parse();
diff --git
a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 8e19fc0..374bae2 100644
---
a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++
b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -51,7 +51,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -75,10 +75,10 @@
return false;
}
AbstractLogicalOperator op = (AbstractLogicalOperator)
op1.getInputs().get(0).getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+ if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
return false;
}
- ExtensionOperator eOp = (ExtensionOperator) op;
+ DelegateOperator eOp = (DelegateOperator) op;
if (!(eOp.getDelegate() instanceof CommitOperator)) {
return false;
}
@@ -140,7 +140,7 @@
context.computeAndSetTypeEnvironmentForOperator(badProject);
//Create my brokerNotify plan above the extension Operator
- ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar,
subscriptionIdVar, channelExecutionVar,
+ DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar,
subscriptionIdVar, channelExecutionVar,
context, eOp, (DistributeResultOperator) op1,
channelDataverse, channelName);
opRef.setValue(dOp);
@@ -148,7 +148,7 @@
return true;
}
- private ExtensionOperator createNotifyBrokerPlan(LogicalVariable
brokerEndpointVar,
+ private DelegateOperator createNotifyBrokerPlan(LogicalVariable
brokerEndpointVar,
LogicalVariable subscriptionIdVar, LogicalVariable
channelExecutionVar, IOptimizationContext context,
ILogicalOperator eOp, DistributeResultOperator distributeOp,
String channelDataverse, String channelName)
throws AlgebricksException {
@@ -196,7 +196,7 @@
EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
channelDataverse, channelName);
NotifyBrokerPOperator notifyBrokerPOp = new
NotifyBrokerPOperator(activeId);
notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
- ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp);
+ DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
extensionOp.setPhysicalOperator(notifyBrokerPOp);
extensionOp.getInputs().add(new
MutableObject<ILogicalOperator>(groupbyOp));
diff --git
a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index c680988..d281b49 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -22,14 +22,14 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
/**
* A repetitive channel operator, which uses a Java timer to run a given query
periodically
*/
-public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
private final LogicalVariable subscriptionIdVar;
private final LogicalVariable brokerEndpointVar;
private final LogicalVariable channelExecutionVar;
@@ -64,7 +64,7 @@
}
@Override
- public IOperatorExtension newInstance() {
+ public IOperatorDelegate newInstance() {
return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar,
channelExecutionVar);
}
diff --git
a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 753ece7..7d0e044 100644
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -27,7 +27,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -73,7 +73,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- ExtensionOperator notify = (ExtensionOperator) op;
+ DelegateOperator notify = (DelegateOperator) op;
LogicalVariable subVar = ((NotifyBrokerOperator)
notify.getDelegate()).getSubscriptionVariable();
LogicalVariable brokerVar = ((NotifyBrokerOperator)
notify.getDelegate()).getBrokerEndpointVariable();
LogicalVariable executionVar = ((NotifyBrokerOperator)
notify.getDelegate()).getChannelExecutionVariable();
--
To view, visit https://asterix-gerrit.ics.uci.edu/1227
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I010b81776543e127f09f046a8601bb7184f7de9a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>