Repository: asterixdb-bad
Updated Branches:
  refs/heads/master 9e13d7255 -> 345b0f572


Add push-based channels and improve broker notifications

Change-Id: Ie3c7cae0f015d6bc01dd912499565bb12c15abc3


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/345b0f57
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/345b0f57
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/345b0f57

Branch: refs/heads/master
Commit: 345b0f5729d3a6ed0564707ec25b56750c5366ec
Parents: 9e13d72
Author: Steven Glenn Jacobs <sjaco...@ucr.edu>
Authored: Thu Apr 12 13:19:50 2018 -0700
Committer: Steven Glenn Jacobs <sjaco...@ucr.edu>
Committed: Thu Apr 12 13:19:50 2018 -0700

----------------------------------------------------------------------
 .../apache/asterix/bad/ChannelJobService.java   |  92 +-------
 .../lang/statement/CreateChannelStatement.java  |  46 +++-
 .../InsertBrokerNotifierForChannelRule.java     | 232 ++++++++++++-------
 .../bad/runtime/NotifyBrokerOperator.java       |  34 ++-
 .../bad/runtime/NotifyBrokerPOperator.java      |  13 +-
 .../bad/runtime/NotifyBrokerRuntime.java        | 141 +++++++++--
 .../bad/runtime/NotifyBrokerRuntimeFactory.java |  18 +-
 .../src/main/resources/lang-extension/lang.txt  |   7 +-
 .../queries/channel/channel-push.sqlpp          |  85 +++++++
 .../results/channel/channel-push.plan           |  64 +++++
 10 files changed, 505 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index 41853b9..3df9a76 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -18,51 +18,16 @@
  */
 package org.apache.asterix.bad;
 
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * Provides functionality for channel jobs and communicating with Brokers
+ * Provides functionality for channel jobs
  */
 public class ChannelJobService {
 
     private static final Logger LOGGER = 
Logger.getLogger(ChannelJobService.class.getName());
 
-    public static void sendBrokerNotificationsForChannel(EntityId activeJobId, 
String brokerEndpoint,
-            AOrderedList subscriptionIds, String channelExecutionTime) throws 
HyracksDataException {
-        String formattedString;
-        formattedString = formatJSON(activeJobId, subscriptionIds, 
channelExecutionTime);
-        sendMessage(brokerEndpoint, formattedString);
-    }
-
-    public static String formatJSON(EntityId activeJobId, AOrderedList 
subscriptionIds, String channelExecutionTime) {
-        String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + 
"\", \"channelName\":\""
-                + activeJobId.getEntityName() + "\", \"" + 
BADConstants.ChannelExecutionTime + "\":\""
-                + channelExecutionTime + "\", \"subscriptionIds\":[";
-        for (int i = 0; i < subscriptionIds.size(); i++) {
-            AUUID subId = (AUUID) subscriptionIds.getItem(i);
-            String subscriptionString = subId.toString();
-            //Broker code currently cannot handle the "uuid {}" part of the 
string, so we parse just the value
-            subscriptionString = subscriptionString.substring(8, 
subscriptionString.length() - 2);
-            JSON += "\"" + subscriptionString + "\"";
-            if (i < subscriptionIds.size() - 1) {
-                JSON += ",";
-            }
-        }
-        JSON += "]}";
-        return JSON;
-
-    }
 
     public static long findPeriod(String duration) {
         //TODO: Allow Repetitive Channels to use YMD durations
@@ -92,61 +57,6 @@ public class ChannelJobService {
         return (long) (seconds * 1000);
     }
 
-    public static void sendMessage(String targetURL, String urlParameters) {
-        HttpURLConnection connection = null;
-        try {
-            //Create connection
-            URL url = new URL(targetURL);
-            connection = (HttpURLConnection) url.openConnection();
-            connection.setRequestMethod("POST");
-            connection.setRequestProperty("Content-Type", 
"application/x-www-form-urlencoded");
-
-            connection.setRequestProperty("Content-Length", 
Integer.toString(urlParameters.getBytes().length));
-            connection.setRequestProperty("Content-Language", "en-US");
-
-            connection.setUseCaches(false);
-            connection.setDoOutput(true);
-            connection.setConnectTimeout(500);
-
-            if (connection.getOutputStream() != null) {
-                //Send message
-                DataOutputStream wr = new 
DataOutputStream(connection.getOutputStream());
-                wr.writeBytes(urlParameters);
-                wr.close();
-            } else {
-                throw new Exception();
-            }
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                int responseCode = connection.getResponseCode();
-                LOGGER.info("\nSending 'POST' request to URL : " + url);
-                LOGGER.info("Post parameters : " + urlParameters);
-                LOGGER.info("Response Code : " + responseCode);
-            }
-
-            if (connection.getInputStream() != null) {
-                BufferedReader in = new BufferedReader(new 
InputStreamReader(connection.getInputStream()));
-                String inputLine;
-                StringBuffer response = new StringBuffer();
-                while ((inputLine = in.readLine()) != null) {
-                    response.append(inputLine);
-                }
-                in.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.log(Level.INFO, response.toString());
-                }
-            } else {
-                LOGGER.log(Level.WARNING, "Channel Failed to get response from 
Broker.");
-            }
-
-        } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
-        } finally {
-            if (connection != null) {
-                connection.disconnect();
-            }
-        }
-    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 161f093..87ac320 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -40,7 +40,6 @@ import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import 
org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -48,6 +47,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
@@ -59,6 +59,7 @@ import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,14 +93,16 @@ public class CreateChannelStatement extends 
ExtensionStatement {
     private String subscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
+    private final boolean push;
 
     public CreateChannelStatement(Identifier dataverseName, Identifier 
channelName, FunctionSignature function,
-            Expression period) {
+            Expression period, boolean push) {
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.function = function;
         this.period = (CallExpr) period;
         this.duration = "";
+        this.push = push;
     }
 
     public Identifier getDataverseName() {
@@ -218,12 +221,37 @@ public class CreateChannelStatement extends 
ExtensionStatement {
 
     }
 
+    private JobSpecification compilePushChannel(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) 
statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        return jobSpec;
+    }
+
     private JobSpecification createChannelJob(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) 
throws Exception {
         StringBuilder builder = new StringBuilder();
         builder.append("SET inline_with \"false\";\n");
-        builder.append("insert into " + dataverse + "." + resultsTableName);
-        builder.append(" as a (\n" + "with " + 
BADConstants.ChannelExecutionTime + " as current_datetime() \n");
+        if (!push) {
+            builder.append("insert into " + dataverse + "." + 
resultsTableName);
+            builder.append(" as a (\n");
+        }
+        builder.append("with " + BADConstants.ChannelExecutionTime + " as 
current_datetime() \n");
         builder.append("select result, ");
         builder.append(BADConstants.ChannelExecutionTime + ", ");
         builder.append("sub." + BADConstants.SubscriptionId + " as " + 
BADConstants.SubscriptionId + ",");
@@ -238,15 +266,19 @@ public class CreateChannelStatement extends 
ExtensionStatement {
         builder.append("sub.param" + i + ") result \n");
         builder.append("where b." + BADConstants.BrokerName + " = sub." + 
BADConstants.BrokerName + "\n");
         builder.append("and b." + BADConstants.DataverseName + " = sub." + 
BADConstants.DataverseName + "\n");
-        builder.append(")");
-        builder.append(" returning a");
+        if (!push) {
+            builder.append(")");
+            builder.append(" returning a");
+        }
         builder.append(";");
         BADParserFactory factory = new BADParserFactory();
         List<Statement> fStatements = factory.createParser(new 
StringReader(builder.toString())).parse();
 
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
-
+        if (push) {
+            return compilePushChannel(statementExecutor, metadataProvider, 
hcc, (Query) fStatements.get(1));
+        }
         return ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider, 
fStatements.get(1),
                 hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index d83b606..9ead7f0 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -31,6 +31,7 @@ import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,6 +44,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -74,133 +76,197 @@ public class InsertBrokerNotifierForChannelRule 
implements IAlgebraicRewriteRule
         if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
+        boolean push = false;
+
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
op1.getInputs().get(0).getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
-            return false;
+            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+                return false;
+            }
+            push = true;
         }
-        DelegateOperator eOp = (DelegateOperator) op;
-        if (!(eOp.getDelegate() instanceof CommitOperator)) {
-            return false;
+        DataSourceScanOperator subscriptionsScan;
+        String channelDataverse;
+        String channelName;
+
+        if (!push) {
+            DelegateOperator eOp = (DelegateOperator) op;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) 
eOp.getInputs().get(0).getValue();
+            if (descendantOp.getOperatorTag() != 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                return false;
+            }
+            InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) 
descendantOp;
+            if (insertOp.getOperation() != 
InsertDeleteUpsertOperator.Kind.INSERT) {
+                return false;
+            }
+            DatasetDataSource dds = (DatasetDataSource) 
insertOp.getDataSource();
+            String datasetName = dds.getDataset().getDatasetName();
+            if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+                    || 
!dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+                    || !datasetName.endsWith("Results")) {
+                return false;
+            }
+            channelDataverse = dds.getDataset().getDataverseName();
+            //Now we know that we are inserting into results
+
+            channelName = datasetName.substring(0, datasetName.length() - 7);
+            String subscriptionsName = channelName + "Subscriptions";
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, 
subscriptionsName);
+            if (subscriptionsScan == null) {
+                return false;
+            }
+
+        } else {
+            //if push, get the channel name here instead
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+            if (subscriptionsScan == null) {
+                return false;
+            }
+            DatasetDataSource dds = (DatasetDataSource) 
subscriptionsScan.getDataSource();
+            String datasetName = dds.getDataset().getDatasetName();
+            channelDataverse = dds.getDataset().getDataverseName();
+            channelName = datasetName.substring(0, datasetName.length() - 13);
         }
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) 
eOp.getInputs().get(0).getValue();
-        if (descendantOp.getOperatorTag() != 
LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+
+        //Now we need to get the broker EndPoint
+        LogicalVariable brokerEndpointVar = context.newVar();
+        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+        if (opAboveBrokersScan == null) {
             return false;
         }
-        InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) 
descendantOp;
-        if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) 
{
+
+        //get subscriptionIdVar
+        LogicalVariable subscriptionIdVar = 
subscriptionsScan.getVariables().get(0);
+
+        //The channelExecutionTime is created just before the scan
+        ILogicalOperator channelExecutionAssign = 
subscriptionsScan.getInputs().get(0).getValue();
+        if (channelExecutionAssign.getOperatorTag() != 
LogicalOperatorTag.ASSIGN) {
             return false;
         }
-        DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
-        String datasetName = dds.getDataset().getDatasetName();
-        if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
-                || 
!dds.getDataset().getItemTypeName().equals("ChannelResultsType")
-                || !datasetName.endsWith("Results")) {
+        LogicalVariable channelExecutionVar = ((AssignOperator) 
channelExecutionAssign).getVariables().get(0);
+        if (!channelExecutionVar.toString().equals("$$" + 
BADConstants.ChannelExecutionTime)) {
             return false;
         }
-        String channelDataverse = dds.getDataset().getDataverseName();
-        //Now we know that we are inserting into results
 
-        String channelName = datasetName.substring(0, datasetName.length() - 
7);
-        String subscriptionsName = channelName + "Subscriptions";
-        //TODO: Can we check here to see if there is a channel with such a 
name?
-
-        DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) 
findOp(op, subscriptionsName);
-        if (subscriptionsScan == null) {
-            return false;
+        if (!push) {
+            ((CommitOperator) ((DelegateOperator) 
op).getDelegate()).setSink(false);
         }
 
-        //Now we want to make sure and set the commit to be a nonsink commit
-        ((CommitOperator) eOp.getDelegate()).setSink(false);
-
-        //Now we need to get the broker EndPoint 
-        LogicalVariable brokerEndpointVar = context.newVar();
-        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
         AssignOperator assignOp = 
createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
         //now brokerNameVar holds the brokerName for use farther up in the plan
 
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
         context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
-        context.computeAndSetTypeEnvironmentForOperator(eOp);
-
-        //get subscriptionIdVar
-        LogicalVariable subscriptionIdVar = 
subscriptionsScan.getVariables().get(0);
+        context.computeAndSetTypeEnvironmentForOperator(op);
 
-        //The channelExecutionTime is created just before the scan
-        LogicalVariable channelExecutionVar = ((AssignOperator) 
subscriptionsScan.getInputs().get(0).getValue())
-                .getVariables().get(0);
-
-        ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+        ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
         badProject.getVariables().add(subscriptionIdVar);
         badProject.getVariables().add(brokerEndpointVar);
         badProject.getVariables().add(channelExecutionVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
+
         //Create my brokerNotify plan above the extension Operator
-        DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, 
subscriptionIdVar, channelExecutionVar,
-                context, eOp, (DistributeResultOperator) op1, 
channelDataverse, channelName);
+        DelegateOperator dOp = push
+                ? createNotifyBrokerPushPlan(brokerEndpointVar, 
badProject.getVariables().get(0), channelExecutionVar,
+                        context, op, (DistributeResultOperator) op1, 
channelDataverse, channelName)
+                : createNotifyBrokerPullPlan(brokerEndpointVar, 
subscriptionIdVar, channelExecutionVar, context, op,
+                        (DistributeResultOperator) op1, channelDataverse, 
channelName);
 
         opRef.setValue(dOp);
 
         return true;
     }
 
-    private DelegateOperator createNotifyBrokerPlan(LogicalVariable 
brokerEndpointVar,
-            LogicalVariable subscriptionIdVar, LogicalVariable 
channelExecutionVar, IOptimizationContext context,
+    private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, 
LogicalVariable sendVar,
+            LogicalVariable channelExecutionVar, String channelDataverse, 
String channelName, boolean push,
+            IAType resultType) {
+        NotifyBrokerOperator notifyBrokerOp =
+                new NotifyBrokerOperator(brokerEndpointVar, sendVar, 
channelExecutionVar, push, resultType);
+        EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
channelDataverse, channelName);
+        NotifyBrokerPOperator notifyBrokerPOp = new 
NotifyBrokerPOperator(activeId);
+        notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
+        DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
+        extensionOp.setPhysicalOperator(notifyBrokerPOp);
+        return extensionOp;
+    }
+
+    private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable 
brokerEndpointVar, LogicalVariable sendVar,
+            LogicalVariable channelExecutionVar, IOptimizationContext context, 
ILogicalOperator eOp,
+            DistributeResultOperator distributeOp, String channelDataverse, 
String channelName)
+            throws AlgebricksException {
+        //Find the assign operator to get the result type that we need
+        AbstractLogicalOperator assign = (AbstractLogicalOperator) 
eOp.getInputs().get(0).getValue();
+        while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            assign = (AbstractLogicalOperator) 
assign.getInputs().get(0).getValue();
+        }
+        IVariableTypeEnvironment env = 
assign.computeOutputTypeEnvironment(context);
+        IAType resultType = (IAType) env.getVarType(sendVar);
+
+        //Create the NotifyBrokerOperator
+        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, 
sendVar, channelExecutionVar, channelDataverse,
+                channelName, true, resultType);
+
+        extensionOp.getInputs().add(new MutableObject<>(eOp));
+        context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+        return extensionOp;
+
+    }
+
+    private DelegateOperator createNotifyBrokerPullPlan(LogicalVariable 
brokerEndpointVar,
+            LogicalVariable sendVar, LogicalVariable channelExecutionVar, 
IOptimizationContext context,
             ILogicalOperator eOp, DistributeResultOperator distributeOp, 
String channelDataverse, String channelName)
                     throws AlgebricksException {
-        //create the Distinct Op
-        ArrayList<Mutable<ILogicalExpression>> expressions = new 
ArrayList<Mutable<ILogicalExpression>>();
-        VariableReferenceExpression vExpr = new 
VariableReferenceExpression(subscriptionIdVar);
-        expressions.add(new MutableObject<ILogicalExpression>(vExpr));
+
+        //Create the Distinct Op
+        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<>();
+        VariableReferenceExpression vExpr = new 
VariableReferenceExpression(sendVar);
+        expressions.add(new MutableObject<>(vExpr));
         DistinctOperator distinctOp = new DistinctOperator(expressions);
 
-        //create the GroupBy Op
-        //And set the distinct as input
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = 
new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
groupByDecorList = new ArrayList<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>>();
-        List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
 
-        //create group by operator
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = 
new ArrayList<>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> 
groupByDecorList = new ArrayList<>();
+        List<ILogicalPlan> nestedPlans = new ArrayList<>();
+
+        //Create GroupBy operator
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, 
groupByDecorList, nestedPlans);
         groupbyOp.addGbyExpression(null, new 
VariableReferenceExpression(brokerEndpointVar));
         groupbyOp.addGbyExpression(null, new 
VariableReferenceExpression(channelExecutionVar));
-        groupbyOp.getInputs().add(new 
MutableObject<ILogicalOperator>(distinctOp));
+
+        //Set the distinct as input
+        groupbyOp.getInputs().add(new MutableObject<>(distinctOp));
 
         //create nested plan for subscription ids in group by
-        NestedTupleSourceOperator nestedTupleSourceOp = new 
NestedTupleSourceOperator(
-                new MutableObject<ILogicalOperator>(groupbyOp));
-        //TODO: This is from translationcontext. It might be needed to make 
the variable exist outside of the subplan
-        //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
-        LogicalVariable subscriptionListVar = context.newVar();
-        List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
-        aggVars.add(subscriptionListVar);
-        AggregateFunctionCallExpression funAgg = 
BuiltinFunctions.makeAggregateFunctionExpression(
-                BuiltinFunctions.LISTIFY, new 
ArrayList<Mutable<ILogicalExpression>>());
-        funAgg.getArguments()
-                .add(new MutableObject<ILogicalExpression>(new 
VariableReferenceExpression(subscriptionIdVar)));
-        List<Mutable<ILogicalExpression>> aggExpressions = new 
ArrayList<Mutable<ILogicalExpression>>();
-        aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
+        NestedTupleSourceOperator nestedTupleSourceOp = new 
NestedTupleSourceOperator(new MutableObject<>(groupbyOp));
+        LogicalVariable sendListVar = context.newVar();
+        List<LogicalVariable> aggVars = new ArrayList<>();
+        aggVars.add(sendListVar);
+        AggregateFunctionCallExpression funAgg =
+                
BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.LISTIFY, new 
ArrayList<>());
+        funAgg.getArguments().add(new MutableObject<>(new 
VariableReferenceExpression(sendVar)));
+        List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<>();
+        aggExpressions.add(new MutableObject<>(funAgg));
         AggregateOperator listifyOp = new AggregateOperator(aggVars, 
aggExpressions);
-        listifyOp.getInputs().add(new 
MutableObject<ILogicalOperator>(nestedTupleSourceOp));
+        listifyOp.getInputs().add(new MutableObject<>(nestedTupleSourceOp));
 
         //add nested plans
-        nestedPlans.add(new ALogicalPlanImpl(new 
MutableObject<ILogicalOperator>(listifyOp)));
+        nestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(listifyOp)));
+
 
         //Create the NotifyBrokerOperator
-        NotifyBrokerOperator notifyBrokerOp = new 
NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
-                channelExecutionVar);
-        EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
channelDataverse, channelName);
-        NotifyBrokerPOperator notifyBrokerPOp = new 
NotifyBrokerPOperator(activeId);
-        notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
-        DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
-        extensionOp.setPhysicalOperator(notifyBrokerPOp);
-        extensionOp.getInputs().add(new 
MutableObject<ILogicalOperator>(groupbyOp));
+        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, 
sendListVar, channelExecutionVar,
+                channelDataverse, channelName, false, null);
 
-        //Set the input for the brokerNotify as the replicate operator
-        distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+        //Set the input for the distinct as the old top
+        extensionOp.getInputs().add(new MutableObject<>(groupbyOp));
+        distinctOp.getInputs().add(new MutableObject<>(eOp));
 
         //compute environment bottom up
-
         context.computeAndSetTypeEnvironmentForOperator(distinctOp);
         context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
         context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
@@ -211,7 +277,6 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
 
     }
 
-    @SuppressWarnings("unchecked")
     private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable 
brokerEndpointVar,
             AbstractLogicalOperator opAboveBrokersScan) {
         Mutable<ILogicalExpression> fieldRef = new 
MutableObject<ILogicalExpression>(
@@ -244,9 +309,10 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
         return assignOp;
     }
 
-    /*This function searches for the needed op
-     * If lookingForBrokers, find the op above the brokers scan
-     * Else find the suscbriptionsScan
+    /*This function is used to find specific operators within the plan, either
+     * A. The brokers dataset scan
+     * B. The subscriptions scan
+     * C. The highest project of the plan
      */
     private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String 
lookingForString) {
         if (!op.hasInputs()) {
@@ -311,7 +377,7 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
                 DatasetDataSource dds = (DatasetDataSource) 
((DataSourceScanOperator) op).getDataSource();
                 if 
(dds.getDataset().getItemTypeDataverseName().equals("Metadata")
                         && 
dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
-                    if 
(dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+                    if (subscriptionsName.equals("") || 
dds.getDataset().getDatasetName().equals(subscriptionsName)) {
                         return true;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
index d281b49..df0f0f4 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -20,6 +20,7 @@ package org.apache.asterix.bad.runtime;
 
 import java.util.Collection;
 
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
@@ -27,22 +28,26 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDel
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
 /**
- * A repetitive channel operator, which uses a Java timer to run a given query 
periodically
+ * An operator for sending broker notifications
  */
 public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
-    private final LogicalVariable subscriptionIdVar;
     private final LogicalVariable brokerEndpointVar;
     private final LogicalVariable channelExecutionVar;
+    private final LogicalVariable pushListVar;
+    private final boolean push;
+    private final IAType recordType;
 
-    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, 
LogicalVariable subscriptionIdVar,
-            LogicalVariable resultSetVar) {
+    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, 
LogicalVariable pushListVar,
+            LogicalVariable resultSetVar, boolean push, IAType recordType) {
         this.brokerEndpointVar = brokerEndpointVar;
-        this.subscriptionIdVar = subscriptionIdVar;
         this.channelExecutionVar = resultSetVar;
+        this.pushListVar = pushListVar;
+        this.push = push;
+        this.recordType = recordType;
     }
 
-    public LogicalVariable getSubscriptionVariable() {
-        return subscriptionIdVar;
+    public LogicalVariable getPushListVar() {
+        return pushListVar;
     }
 
     public LogicalVariable getBrokerEndpointVariable() {
@@ -53,9 +58,18 @@ public class NotifyBrokerOperator extends 
AbstractDelegatedLogicalOperator {
         return channelExecutionVar;
     }
 
+    public IAType getRecordType() {
+        return recordType;
+    }
+
+    public boolean getPush() {
+        return push;
+    }
+
     @Override
     public String toString() {
-        return "notify-brokers";
+        return "notify-brokers (" + brokerEndpointVar.toString() + "," + 
channelExecutionVar.toString() + ","
+                + pushListVar.toString() + ")";
     }
 
     @Override
@@ -65,7 +79,7 @@ public class NotifyBrokerOperator extends 
AbstractDelegatedLogicalOperator {
 
     @Override
     public IOperatorDelegate newInstance() {
-        return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, 
channelExecutionVar);
+        return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, 
channelExecutionVar, push, recordType);
     }
 
     @Override
@@ -76,7 +90,7 @@ public class NotifyBrokerOperator extends 
AbstractDelegatedLogicalOperator {
 
     @Override
     public void getUsedVariables(Collection<LogicalVariable> usedVars) {
-        usedVars.add(subscriptionIdVar);
+        usedVars.add(pushListVar);
         usedVars.add(brokerEndpointVar);
         usedVars.add(channelExecutionVar);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
index 12d5ae2..b9cfbfd 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.bad.runtime;
 
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -74,20 +75,22 @@ public class NotifyBrokerPOperator extends 
AbstractPhysicalOperator {
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
                     throws AlgebricksException {
         DelegateOperator notify = (DelegateOperator) op;
-        LogicalVariable subVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getSubscriptionVariable();
+        LogicalVariable pushListVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getPushListVar();
         LogicalVariable brokerVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getBrokerEndpointVariable();
         LogicalVariable executionVar = ((NotifyBrokerOperator) 
notify.getDelegate()).getChannelExecutionVariable();
+        IAType recordType = ((NotifyBrokerOperator) 
notify.getDelegate()).getRecordType();
+        boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush();
 
         int brokerColumn = inputSchemas[0].findVariable(brokerVar);
-        int subColumn = inputSchemas[0].findVariable(subVar);
+        int pushColumn = inputSchemas[0].findVariable(pushListVar);
         int executionColumn = inputSchemas[0].findVariable(executionVar);
 
         IScalarEvaluatorFactory brokerEvalFactory = new 
ColumnAccessEvalFactory(brokerColumn);
-        IScalarEvaluatorFactory subEvalFactory = new 
ColumnAccessEvalFactory(subColumn);
+        IScalarEvaluatorFactory pushListEvalFactory = new 
ColumnAccessEvalFactory(pushColumn);
         IScalarEvaluatorFactory channelExecutionEvalFactory = new 
ColumnAccessEvalFactory(executionColumn);
 
-        NotifyBrokerRuntimeFactory runtime = new 
NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
-                channelExecutionEvalFactory, entityId);
+        NotifyBrokerRuntimeFactory runtime = new 
NotifyBrokerRuntimeFactory(brokerEvalFactory, pushListEvalFactory,
+                channelExecutionEvalFactory, entityId, push, recordType);
 
         RecordDescriptor recDesc = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema,
                 context);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 5d51926..6ffb244 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -20,21 +20,33 @@
 package org.apache.asterix.bad.runtime;
 
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.common.api.INcApplicationContext;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AUUID;
 import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -48,11 +60,13 @@ import 
org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class NotifyBrokerRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+    private static final Logger LOGGER = 
Logger.getLogger(NotifyBrokerRuntime.class.getName());
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
     private final AOrderedListSerializerDeserializer subSerDes =
             new AOrderedListSerializerDeserializer(new 
AOrderedListType(BuiltinType.AUUID, null));
+    private final ARecordSerializerDeserializer recordSerDes;
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -62,17 +76,29 @@ public class NotifyBrokerRuntime extends 
AbstractOneInputOneOutputOneFramePushRu
     private IScalarEvaluator eval2;
     private final ActiveManager activeManager;
     private final EntityId entityId;
+    private final boolean push;
+    private AOrderedList pushList;
+    private ARecord pushRecord;
+    private final IAType recordType;
+    private final Map<String, HashSet<String>> sendData = new HashMap<>();
+    private String executionTimeString;
 
     public NotifyBrokerRuntime(IHyracksTaskContext ctx, 
IScalarEvaluatorFactory brokerEvalFactory,
-            IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory 
channelExecutionEvalFactory,
-            EntityId activeJobId) throws HyracksDataException {
+            IScalarEvaluatorFactory pushListEvalFactory, 
IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId activeJobId, boolean push, IAType recordType) throws 
HyracksDataException {
         this.tRef = new FrameTupleReference();
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
-        eval1 = subEvalFactory.createScalarEvaluator(ctx);
+        eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
         this.activeManager = (ActiveManager) ((INcApplicationContext) 
ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
+        this.push = push;
+        this.pushList = null;
+        this.pushRecord = null;
+        this.recordType = recordType;
+        recordSerDes = new ARecordSerializerDeserializer((ARecordType) 
recordType);
+        executionTimeString = null;
     }
 
     @Override
@@ -80,6 +106,61 @@ public class NotifyBrokerRuntime extends 
AbstractOneInputOneOutputOneFramePushRu
         return;
     }
 
+    private void addSubscriptions(String endpoint, AOrderedList 
subscriptionIds) {
+        for (int i = 0; i < subscriptionIds.size(); i++) {
+            AUUID subId = (AUUID) subscriptionIds.getItem(i);
+            String subscriptionString = subId.toString();
+            //Broker code currently cannot handle the "uuid {}" part of the 
string, so we parse just the value
+            subscriptionString = subscriptionString.substring(8, 
subscriptionString.length() - 2);
+            subscriptionString = "\"" + subscriptionString + "\"";
+            sendData.get(endpoint).add(subscriptionString);
+        }
+    }
+
+    public String createData(String endpoint) {
+        String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + 
"\", \"channelName\":\""
+                + entityId.getEntityName() + "\", \"" + 
BADConstants.ChannelExecutionTime + "\":\""
+                + executionTimeString + "\", \"subscriptionIds\":[";
+        for (String value : sendData.get(endpoint)) {
+            JSON += value;
+            JSON += ",";
+        }
+        JSON = JSON.substring(0, JSON.length() - 1);
+        JSON += "]}";
+        return JSON;
+
+    }
+
+    private void sendGroupOfResults(String endpoint) {
+        String urlParameters = createData(endpoint);
+        try {
+            //Create connection
+            URL url = new URL(endpoint);
+            HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+            connection.setRequestMethod("POST");
+            connection.setRequestProperty("Content-Type", 
"application/x-www-form-urlencoded");
+
+            connection.setRequestProperty("Content-Length", 
Integer.toString(urlParameters.getBytes().length));
+            connection.setRequestProperty("Content-Language", "en-US");
+
+            connection.setUseCaches(false);
+            connection.setDoOutput(true);
+            connection.setConnectTimeout(500);
+            DataOutputStream wr = new 
DataOutputStream(connection.getOutputStream());
+            wr.writeBytes(urlParameters);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                int responseCode = connection.getResponseCode();
+                LOGGER.info("\nSending 'POST' request to URL : " + url);
+                LOGGER.info("Post parameters : " + urlParameters);
+                LOGGER.info("Response Code : " + responseCode);
+            }
+            wr.close();
+            connection.disconnect();
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
+        }
+    }
+
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         tAccess.reset(buffer);
@@ -91,33 +172,47 @@ public class NotifyBrokerRuntime extends 
AbstractOneInputOneOutputOneFramePushRu
             eval1.evaluate(tRef, inputArg1);
             eval2.evaluate(tRef, inputArg2);
 
+            if (executionTimeString == null) {
+                int resultSetOffset = inputArg2.getStartOffset();
+                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
resultSetOffset + 1);
+                ADateTime executionTime = 
ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+                try {
+                    executionTimeString = executionTime.toSimpleString();
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+
             int serBrokerOffset = inputArg0.getStartOffset();
             bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
serBrokerOffset + 1);
-            AString endpoint = 
AStringSerializerDeserializer.INSTANCE.deserialize(di);
-
-            int serSubOffset = inputArg1.getStartOffset();
-            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
serSubOffset + 1);
-            AOrderedList subs = subSerDes.deserialize(di);
-
-            int resultSetOffset = inputArg2.getStartOffset();
-            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
resultSetOffset + 1);
-            ADateTime executionTime = 
ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
-            String executionTimeString;
-            try {
-                executionTimeString = executionTime.toSimpleString();
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
+            String endpoint = 
AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
+            sendData.putIfAbsent(endpoint, new HashSet<>());
+
+            if (push) {
+                int pushOffset = inputArg1.getStartOffset();
+                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
pushOffset + 1);
+                //TODO: Right now this creates an object per channel result. 
Need to find a better way to deserialize
+                pushRecord = recordSerDes.deserialize(di);
+                sendData.get(endpoint).add(pushRecord.toString());
+
+            } else {
+                int serSubOffset = inputArg1.getStartOffset();
+                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
serSubOffset + 1);
+                pushList = subSerDes.deserialize(di);
+                addSubscriptions(endpoint, pushList);
             }
-
-            ChannelJobService.sendBrokerNotificationsForChannel(entityId, 
endpoint.getStringValue(), subs,
-                    executionTimeString);
-
         }
 
     }
 
     @Override
     public void close() throws HyracksDataException {
+        for (String endpoint : sendData.keySet()) {
+            if (sendData.get(endpoint).size() > 0) {
+                sendGroupOfResults(endpoint);
+                sendData.get(endpoint).clear();
+            }
+        }
         return;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
index 0e2be8b..a7f12ba 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.bad.runtime;
 
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -31,16 +32,21 @@ public class NotifyBrokerRuntimeFactory implements 
IPushRuntimeFactory {
     private static final long serialVersionUID = 1L;
 
     private final IScalarEvaluatorFactory brokerEvalFactory;
-    private final IScalarEvaluatorFactory subEvalFactory;
+    private final IScalarEvaluatorFactory pushListEvalFactory;
     private final IScalarEvaluatorFactory channelExecutionEvalFactory;
     private final EntityId entityId;
+    private final boolean push;
+    private final IAType recordType;
 
-    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory 
brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
-            IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId 
entityId) {
+    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory 
brokerEvalFactory,
+            IScalarEvaluatorFactory pushListEvalFactory, 
IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId entityId, boolean push, IAType recordType) {
         this.brokerEvalFactory = brokerEvalFactory;
-        this.subEvalFactory = subEvalFactory;
+        this.pushListEvalFactory = pushListEvalFactory;
         this.channelExecutionEvalFactory = channelExecutionEvalFactory;
         this.entityId = entityId;
+        this.push = push;
+        this.recordType = recordType;
     }
 
     @Override
@@ -50,7 +56,7 @@ public class NotifyBrokerRuntimeFactory implements 
IPushRuntimeFactory {
 
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, 
brokerEvalFactory, subEvalFactory,
-                channelExecutionEvalFactory, entityId) };
+        return new IPushRuntime[] { new NotifyBrokerRuntime(ctx, 
brokerEvalFactory, pushListEvalFactory,
+                channelExecutionEvalFactory, entityId, push, recordType) };
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt 
b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 02aba78..2d7ba75 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -101,15 +101,18 @@ CreateChannelStatement ChannelSpecification() throws 
ParseException:
   CreateChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
+  boolean push = false;
 }
 {
   (
-    "repetitive" "channel"  nameComponents = QualifiedName()
+    "repetitive"
+    ( "push" { push = true; } )?
+     "channel"  nameComponents = QualifiedName()
     <USING> appliedFunction = FunctionSignature()
     "period" period = FunctionCallExpr()
     {
       ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, 
period);
+                                   nameComponents.second, appliedFunction, 
period, push);
     }
   )
     {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp 
b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
new file mode 100644
index 0000000..e20638b
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-push.sqlpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Check a push-based channel plan
+ * Expected Res : Success
+ * Date         : Mar 2018
+ */
+
+drop dataverse channels7 if exists;
+create dataverse channels7;
+use channels7;
+
+create type UserLocation as {
+    location: circle,
+    userName: string,
+    timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+    location: circle,
+    userName: string
+};
+
+create type EmergencyReport as {
+    reportId: uuid,
+    Etype: string,
+    location: circle,
+    timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+    Etype: string,
+    location: circle
+};
+
+
+create type EmergencyShelter as {
+    shelterName: string,
+    location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+    select report, shelters from
+     ( select value r from Reports r where r.timeStamp >
+     current_datetime() - day_time_duration("PT10S"))report,
+    UserLocations u
+    let shelters = (select s.location from Shelters s where 
spatial_intersect(s.location,u.location))
+    where u.userName = userName
+    and spatial_intersect(report.location,u.location)
+  )
+};
+
+write output to nc1:"rttest/channel-push.sqlpp";
+
+create repetitive push channel EmergencyChannel using 
RecentEmergenciesNearUser@1 period duration("PT10S");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/345b0f57/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
new file mode 100644
index 0000000..770617f
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
@@ -0,0 +1,64 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- HYBRID_HASH_JOIN [$$128][$$135]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- HYBRID_HASH_JOIN [$$126, $$124][$$117, $$118]  
|PARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$126, $$124]  
|PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$117, $$118]  
|PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
+                    -- NESTED_LOOP  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                        -- PRE_CLUSTERED_GROUP_BY[$$120]  |PARTITIONED|
+                                {
+                                  -- AGGREGATE  |LOCAL|
+                                    -- STREAM_SELECT  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                }
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STABLE_SORT [$$120(ASC)]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
+                                -- NESTED_LOOP  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
+                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
\ No newline at end of file

Reply via email to