Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2620

Change subject: Allow channels to update their jobs to use new indexes
......................................................................

Allow channels to update their jobs to use new indexes

- storage format changes: new field for Channel body

This changes uses the Asterix upsertDeployedJobSpec to
recompile and update the channel job when new indexes are
created.

Added test case
Moved methods from Asterix DeployedJobService to BADJobService

Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
A asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
D asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.sleep.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.sleep.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.query.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.7.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
M 
asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M 
asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
22 files changed, 1,187 insertions(+), 221 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad 
refs/changes/20/2620/1

diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index d2d0fa3..d422663 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -50,6 +50,7 @@
     String FIELD_NAME_RETURN_TYPE = "ReturnType";
     String FIELD_NAME_DEFINITION = "Definition";
     String FIELD_NAME_LANGUAGE = "Language";
+    String FIELD_NAME_BODY = "Body";
     //To enable new Asterix TxnId for separate deployed job spec invocations
     byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
     int EXECUTOR_TIMEOUT = 20;
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
new file mode 100644
index 0000000..54a6df7
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.StringReader;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADParserFactory;
+import org.apache.asterix.bad.lang.BADStatementExecutor;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for channel jobs
+ */
+public class BADJobService {
+
+    private static final Logger LOGGER = 
Logger.getLogger(BADJobService.class.getName());
+
+    //pool size one (only running one thread at a time)
+    private static final int POOL_SIZE = 1;
+
+    private static final long millisecondTimeout = 
BADConstants.EXECUTOR_TIMEOUT * 1000;
+
+    //Starts running a deployed job specification periodically with an 
interval of "duration" seconds
+    public static ScheduledExecutorService 
startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+            IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> 
jobParameters, EntityId entityId,
+            ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) 
{
+        ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(POOL_SIZE);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (!runRepetitiveDeployedJobSpec(distributedId, hcc, 
jobParameters, duration, entityId,
+                            txnIdFactory, listener)) {
+                        scheduledExecutorService.shutdown();
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + 
entityId.getExtensionName() + " "
+                            + entityId.getDataverse() + "." + 
entityId.getEntityName() + ".", e);
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId 
distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, long duration, EntityId 
entityId, ITxnIdFactory txnIdFactory,
+            DeployedJobSpecEventListener listener) throws Exception {
+        long executionMilliseconds =
+                runDeployedJobSpec(distributedId, hcc, jobParameters, 
entityId, txnIdFactory, null, listener, null);
+        if (executionMilliseconds > duration) {
+            LOGGER.log(Level.SEVERE,
+                    "Periodic job for " + entityId.getExtensionName() + " " + 
entityId.getDataverse() + "."
+                            + entityId.getEntityName() + " was unable to meet 
the required period of " + duration
+                            + " milliseconds. Actually took " + 
executionMilliseconds + " execution will shutdown"
+                            + new Date());
+            return false;
+        }
+        return true;
+    }
+
+    public static long runDeployedJobSpec(DeployedJobSpecId distributedId, 
IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, EntityId entityId, 
ITxnIdFactory txnIdFactory,
+            ICcApplicationContext appCtx, DeployedJobSpecEventListener 
listener, QueryTranslator statementExecutor)
+            throws Exception {
+
+        long end = System.currentTimeMillis() + millisecondTimeout;
+        boolean success = false;
+        while (System.currentTimeMillis() < end) {
+            success = 
listener.setInstanceCount(DeployedJobSpecEventListener.InstanceChange.INCREASE);
+            if (success) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        if (!success) {
+            throw new RuntimeException("Deployed Job for " + 
entityId.getExtensionName() + " " + entityId.getDataverse()
+                    + "." + entityId.getEntityName() + " failed to run because 
the deployed job was not available");
+        }
+
+        //Add the Asterix Transaction Id to the map
+        jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
+                String.valueOf(txnIdFactory.create().getId()).getBytes());
+
+        long startTime = Instant.now().toEpochMilli();
+        JobId jobId = hcc.startJob(distributedId, jobParameters);
+
+        hcc.waitForCompletion(jobId);
+        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+        if (listener.getType() == 
DeployedJobSpecEventListener.PrecompiledType.QUERY) {
+            ResultReader resultReader = new 
ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
+
+            ResultUtil.printResults(appCtx, resultReader, 
statementExecutor.getSessionOutput(),
+                    new IStatementExecutor.Stats(), null);
+        }
+
+        
listener.setInstanceCount(DeployedJobSpecEventListener.InstanceChange.DECREASE);
+
+        LOGGER.log(Level.SEVERE,
+                "Deployed Job execution completed for " + 
entityId.getExtensionName() + " " + entityId.getDataverse()
+                        + "." + entityId.getEntityName() + ". Took " + 
executionMilliseconds + " milliseconds ");
+
+        return executionMilliseconds;
+
+    }
+
+
+    public static long findPeriod(String duration) {
+        //TODO: Allow Repetitive Channels to use YMD durations
+        String hoursMinutesSeconds = "";
+        if (duration.indexOf('T') != -1) {
+            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 
1);
+        }
+        double seconds = 0;
+        if (hoursMinutesSeconds != "") {
+            int pos = 0;
+            if (hoursMinutesSeconds.indexOf('H') != -1) {
+                Double hours = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('H')));
+                seconds += (hours * 60 * 60);
+                pos = hoursMinutesSeconds.indexOf('H') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('M') != -1) {
+                Double minutes =
+                        Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('M')));
+                seconds += (minutes * 60);
+                pos = hoursMinutesSeconds.indexOf('M') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('S') != -1) {
+                Double s = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('S')));
+                seconds += (s);
+            }
+        }
+        return (long) (seconds * 1000);
+    }
+
+    public static JobSpecification compilePushChannel(IStatementExecutor 
statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, 
Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) 
statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        return jobSpec;
+    }
+
+    public static void getLock(EntityId entityId, DeployedJobSpecEventListener 
listener) throws Exception {
+
+        long end = System.currentTimeMillis() + millisecondTimeout;
+        boolean success = false;
+        while (System.currentTimeMillis() < end) {
+            success = 
listener.setInstanceCount(DeployedJobSpecEventListener.InstanceChange.LOCK);
+            if (success) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        if (!success) {
+            throw new RuntimeException("Cannot upsert new Job Spec for 
Deployed Job for " + entityId.getExtensionName()
+                    + " " + entityId.getDataverse() + "." + 
entityId.getEntityName()
+                    + " because the job spec is in use");
+        }
+
+    }
+
+    public static void redeployChannel(Channel channel, MetadataProvider 
metadataProvider,
+            BADStatementExecutor badStatementExecutor, 
IHyracksClientConnection hcc) throws Exception {
+
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        DeployedJobSpecEventListener listener =
+                (DeployedJobSpecEventListener) 
activeEventHandler.getListener(channel.getChannelId());
+        if (listener == null) {
+            LOGGER.severe("Tried to redeploy the job for " + 
channel.getChannelId() + " but no listener exists.");
+            return;
+        }
+
+        getLock(channel.getChannelId(), listener);
+
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new 
StringReader(channel.getBody())).parse();
+        SetStatement ss = (SetStatement) fStatements.get(0);
+        metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
+        JobSpecification jobSpec;
+        if 
(listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL))
 {
+            jobSpec = compilePushChannel(badStatementExecutor, 
metadataProvider, hcc, (Query) fStatements.get(1));
+        } else {
+            jobSpec = 
badStatementExecutor.handleInsertUpsertStatement(metadataProvider, 
fStatements.get(1), hcc, null,
+                    null, null, null, true, null);
+        }
+        hcc.upsertDeployedJobSpec(listener.getDeployedJobSpecId(), jobSpec);
+
+        
listener.setInstanceCount(DeployedJobSpecEventListener.InstanceChange.UNLOCK);
+
+    }
+
+    @Override
+    public String toString() {
+        return "BADJobService";
+    }
+
+}
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
deleted file mode 100644
index 3df9a76..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-import java.util.logging.Logger;
-
-
-/**
- * Provides functionality for channel jobs
- */
-public class ChannelJobService {
-
-    private static final Logger LOGGER = 
Logger.getLogger(ChannelJobService.class.getName());
-
-
-    public static long findPeriod(String duration) {
-        //TODO: Allow Repetitive Channels to use YMD durations
-        String hoursMinutesSeconds = "";
-        if (duration.indexOf('T') != -1) {
-            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 
1);
-        }
-        double seconds = 0;
-        if (hoursMinutesSeconds != "") {
-            int pos = 0;
-            if (hoursMinutesSeconds.indexOf('H') != -1) {
-                Double hours = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('H')));
-                seconds += (hours * 60 * 60);
-                pos = hoursMinutesSeconds.indexOf('H') + 1;
-            }
-            if (hoursMinutesSeconds.indexOf('M') != -1) {
-                Double minutes =
-                        Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('M')));
-                seconds += (minutes * 60);
-                pos = hoursMinutesSeconds.indexOf('M') + 1;
-            }
-            if (hoursMinutesSeconds.indexOf('S') != -1) {
-                Double s = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('S')));
-                seconds += (s);
-            }
-        }
-        return (long) (seconds * 1000);
-    }
-
-
-    @Override
-    public String toString() {
-        return "ChannelJobService";
-    }
-
-}
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 8c7143f..95edec8 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -18,11 +18,13 @@
  */
 package org.apache.asterix.bad.lang;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
 import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
@@ -34,6 +36,7 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
@@ -42,9 +45,12 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 
 public class BADStatementExecutor extends QueryTranslator {
@@ -56,16 +62,21 @@
 
     //TODO: Most of this file could go away if we had metadata dependencies
 
-    private void checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx, 
String dataverse, String dataset)
-            throws CompilationException, AlgebricksException {
+    private Pair<List<Channel>, List<Procedure>> 
checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx,
+            String dataverse, String dataset, boolean checkAll) throws 
AlgebricksException {
+        List<Channel> channelsUsingDataset = new ArrayList<>();
+        List<Procedure> proceduresUsingDataset = new ArrayList<>();
         List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
         for (Channel channel : channels) {
             List<List<List<String>>> dependencies = channel.getDependencies();
             List<List<String>> datasetDependencies = dependencies.get(0);
             for (List<String> dependency : datasetDependencies) {
                 if (dependency.get(0).equals(dataverse) && 
dependency.get(1).equals(dataset)) {
-                    throw new CompilationException("Cannot alter dataset " + 
dataverse + "." + dataset + ". "
-                            + channel.getChannelId() + " depends on it!");
+                    channelsUsingDataset.add(channel);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingDataset, 
proceduresUsingDataset);
+                    }
+
                 }
             }
 
@@ -76,11 +87,81 @@
             List<List<String>> datasetDependencies = dependencies.get(0);
             for (List<String> dependency : datasetDependencies) {
                 if (dependency.get(0).equals(dataverse) && 
dependency.get(1).equals(dataset)) {
-                    throw new CompilationException("Cannot alter dataset " + 
dataverse + "." + dataset + ". "
-                            + procedure.getEntityId() + " depends on it!");
+                    proceduresUsingDataset.add(procedure);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingDataset, 
proceduresUsingDataset);
+                    }
                 }
             }
 
+        }
+        return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+    }
+
+    private Pair<List<Channel>, List<Procedure>> 
checkIfFunctionIsInUse(MetadataTransactionContext mdTxnCtx,
+            String dvId, String function, String arity, boolean checkAll)
+            throws CompilationException, AlgebricksException {
+        List<Channel> channelsUsingFunction = new ArrayList<>();
+        List<Procedure> proceduresUsingFunction = new ArrayList<>();
+
+        List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
+        for (Channel channel : channels) {
+            List<List<List<String>>> dependencies = channel.getDependencies();
+            List<List<String>> datasetDependencies = dependencies.get(1);
+            for (List<String> dependency : datasetDependencies) {
+                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
+                        && dependency.get(2).equals(arity)) {
+                    channelsUsingFunction.add(channel);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingFunction, 
proceduresUsingFunction);
+                    }
+                }
+            }
+
+        }
+        List<Procedure> procedures = 
BADLangExtension.getAllProcedures(mdTxnCtx);
+        for (Procedure procedure : procedures) {
+            List<List<List<String>>> dependencies = 
procedure.getDependencies();
+            List<List<String>> datasetDependencies = dependencies.get(1);
+            for (List<String> dependency : datasetDependencies) {
+                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
+                        && dependency.get(2).equals(arity)) {
+                    proceduresUsingFunction.add(procedure);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingFunction, 
proceduresUsingFunction);
+                    }
+                }
+            }
+
+        }
+        return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+    }
+
+    private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, 
String dataverse, String dataset)
+            throws CompilationException, AlgebricksException {
+        Pair<List<Channel>, List<Procedure>> dependents = 
checkIfDatasetIsInUse(mdTxnCtx, dataverse, dataset, false);
+        if (dependents.first.size() > 0) {
+            throw new CompilationException("Cannot alter dataset " + dataverse 
+ "." + dataset + ". "
+                    + dependents.first.get(0).getChannelId() + " depends on 
it!");
+        }
+        if (dependents.second.size() > 0) {
+            throw new CompilationException("Cannot alter dataset " + dataverse 
+ "." + dataset + ". "
+                    + dependents.second.get(0).getEntityId() + " depends on 
it!");
+        }
+    }
+
+    private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, 
String dataverse, String function,
+            String arity, FunctionSignature sig) throws CompilationException, 
AlgebricksException {
+        Pair<List<Channel>, List<Procedure>> dependents =
+                checkIfFunctionIsInUse(mdTxnCtx, dataverse, function, arity, 
false);
+        String errorStart = sig != null ? "Cannot drop function " + sig + "." 
: "Cannot drop index.";
+        if (dependents.first.size() > 0) {
+            throw new CompilationException(
+                    errorStart + " " + dependents.first.get(0).getChannelId() 
+ " depends on it!");
+        }
+        if (dependents.second.size() > 0) {
+            throw new CompilationException(
+                    errorStart + " " + dependents.second.get(0).getEntityId() 
+ " depends on it!");
         }
     }
 
@@ -92,10 +173,63 @@
         String dvId = getActiveDataverse(((DropDatasetStatement) 
stmt).getDataverseName());
         Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName();
 
-        checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+        throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleDatasetDropStatement(metadataProvider, stmt, hcc, 
requestParameters);
+    }
+
+    @Override
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, 
Statement stmt,
+            IHyracksClientConnection hcc, IRequestParameters 
requestParameters) throws Exception {
+
+        //TODO: Check whether a delete or insert procedure using the index. If 
so, we will need to
+        // disallow the procedure until after the newly distributed version is 
ready
+        super.handleCreateIndexStatement(metadataProvider, stmt, hcc, 
requestParameters);
+
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        //Allow channels to use the new index
+        String dvId = getActiveDataverse(((CreateIndexStatement) 
stmt).getDataverseName());
+        String dsId = ((CreateIndexStatement) 
stmt).getDatasetName().getValue();
+
+        Pair<List<Channel>, List<Procedure>> usages = 
checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId, true);
+
+        List<Dataverse> dataverseList = 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        for (Dataverse dv : dataverseList) {
+            List<Function> functions = 
MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+            for (Function function : functions) {
+                for (List<String> datasetDependency : 
function.getDependencies().get(0)) {
+                    if (datasetDependency.get(0).equals(dvId) && 
datasetDependency.get(1).equals(dsId)) {
+                        Pair<List<Channel>, List<Procedure>> functionUsages =
+                                checkIfFunctionIsInUse(mdTxnCtx, 
function.getDataverseName(), function.getName(),
+                                        Integer.toString(function.getArity()), 
true);
+                        for (Channel channel : functionUsages.first) {
+                            if (!usages.first.contains(channel)) {
+                                usages.first.add(channel);
+                            }
+                        }
+                        for (Procedure procedure : functionUsages.second) {
+                            if (!usages.second.contains(procedure)) {
+                                usages.second.add(procedure);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        metadataProvider.getLocks().unlock();
+
+        for (Channel channel : usages.first) {
+            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            BADJobService.redeployChannel(channel, metadataProvider, this, 
hcc);
+        }
+        for (Procedure procedure : usages.second) {
+            //TODO: redeployProcedure
+        }
+
+
     }
 
     @Override
@@ -106,7 +240,20 @@
         String dvId = getActiveDataverse(((IndexDropStatement) 
stmt).getDataverseName());
         Identifier dsId = ((IndexDropStatement) stmt).getDatasetName();
 
-        checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+        throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
+
+        List<Dataverse> dataverseList = 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        for (Dataverse dv : dataverseList) {
+            List<Function> functions = 
MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+            for (Function function : functions) {
+                for (List<String> datasetDependency : 
function.getDependencies().get(0)) {
+                    if (datasetDependency.get(0).equals(dvId) && 
datasetDependency.get(1).equals(dsId.getValue())) {
+                        throwErrorIfFunctionUsed(mdTxnCtx, 
function.getDataverseName(), function.getName(),
+                                Integer.toString(function.getArity()), null);
+                    }
+                }
+            }
+        }
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleIndexDropStatement(metadataProvider, stmt, hcc, 
requestParameters);
@@ -122,32 +269,7 @@
         String function = sig.getName();
         String arity = Integer.toString(sig.getArity());
 
-        List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
-        for (Channel channel : channels) {
-            List<List<List<String>>> dependencies = channel.getDependencies();
-            List<List<String>> datasetDependencies = dependencies.get(1);
-            for (List<String> dependency : datasetDependencies) {
-                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
-                        && dependency.get(2).equals(arity)) {
-                    throw new CompilationException(
-                            "Cannot drop function " + sig + ". " + 
channel.getChannelId() + " depends on it!");
-                }
-            }
-
-        }
-        List<Procedure> procedures = 
BADLangExtension.getAllProcedures(mdTxnCtx);
-        for (Procedure procedure : procedures) {
-            List<List<List<String>>> dependencies = 
procedure.getDependencies();
-            List<List<String>> datasetDependencies = dependencies.get(1);
-            for (List<String> dependency : datasetDependencies) {
-                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
-                        && dependency.get(2).equals(arity)) {
-                    throw new CompilationException(
-                            "Cannot drop function " + sig + ". " + 
procedure.getEntityId() + " depends on it!");
-                }
-            }
-
-        }
+        throwErrorIfFunctionUsed(mdTxnCtx, dvId, function, arity, sig);
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleFunctionDropStatement(metadataProvider, stmt);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 87ac320..175b6a6 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -28,13 +28,12 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
@@ -57,7 +56,6 @@
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
-import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
@@ -89,7 +87,7 @@
     private final CallExpr period;
     private Identifier dataverseName;
     private String duration;
-    private InsertStatement channelResultsInsertQuery;
+    private String body;
     private String subscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
@@ -131,10 +129,6 @@
 
     public Expression getPeriod() {
         return period;
-    }
-
-    public InsertStatement getChannelResultsInsertQuery() {
-        return channelResultsInsertQuery;
     }
 
     @Override
@@ -221,28 +215,6 @@
 
     }
 
-    private JobSpecification compilePushChannel(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, Query q) throws Exception {
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        JobSpecification jobSpec = null;
-        try {
-            jobSpec = ((QueryTranslator) 
statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-        } catch (Exception e) {
-            LOGGER.log(Level.INFO, e.getMessage(), e);
-            if (bActiveTxn) {
-                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        } finally {
-            metadataProvider.getLocks().unlock();
-        }
-        return jobSpec;
-    }
-
     private JobSpecification createChannelJob(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) 
throws Exception {
         StringBuilder builder = new StringBuilder();
@@ -271,13 +243,15 @@
             builder.append(" returning a");
         }
         builder.append(";");
+        body = builder.toString();
         BADParserFactory factory = new BADParserFactory();
         List<Statement> fStatements = factory.createParser(new 
StringReader(builder.toString())).parse();
 
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
         if (push) {
-            return compilePushChannel(statementExecutor, metadataProvider, 
hcc, (Query) fStatements.get(1));
+            return BADJobService.compilePushChannel(statementExecutor, 
metadataProvider, hcc,
+                    (Query) fStatements.get(1));
         }
         return ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider, 
fStatements.get(1),
                 hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
@@ -287,8 +261,8 @@
             DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) 
throws Exception {
         if (channeljobSpec != null) {
             DeployedJobSpecId destributedId = 
hcc.deployJobSpec(channeljobSpec);
-            ScheduledExecutorService ses = 
DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
-                    ChannelJobService.findPeriod(duration), new HashMap<>(), 
entityId, txnIdFactory);
+            ScheduledExecutorService ses = 
BADJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
+                    BADJobService.findPeriod(duration), new HashMap<>(), 
entityId, txnIdFactory, listener);
             listener.storeDistributedInfo(destributedId, ses, null, null);
         }
 
@@ -354,14 +328,15 @@
 
             // Now we subscribe
             if (listener == null) {
-                listener = new DeployedJobSpecEventListener(appCtx, entityId, 
PrecompiledType.CHANNEL, null,
+                listener = new DeployedJobSpecEventListener(appCtx, entityId,
+                        push ? PrecompiledType.PUSH_CHANNEL : 
PrecompiledType.CHANNEL, null,
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
 
             setupExecutorJob(entityId, channeljobSpec, hcc, listener, 
metadataProvider.getTxnIdFactory());
             channel = new Channel(dataverse, channelName.getValue(), 
subscriptionsTableName, resultsTableName, function,
-                    duration, null);
+                    duration, null, body);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 635f2ce..025b9e6 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -24,18 +24,14 @@
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
-import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
-import 
org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -54,12 +50,10 @@
 import org.apache.asterix.translator.ConstantHelper;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ExecuteProcedureStatement extends ExtensionStatement {
@@ -111,10 +105,9 @@
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, 
dataverse, procedureName);
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) 
activeEventHandler.getListener(entityId);
-        Procedure procedure = null;
+        Procedure procedure;
 
         MetadataTransactionContext mdTxnCtx = null;
-        JobId jobId;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
@@ -125,31 +118,14 @@
             Map<byte[], byte[]> contextRuntimeVarMap = 
createParameterMap(procedure);
             DeployedJobSpecId deployedJobSpecId = 
listener.getDeployedJobSpecId();
             if (procedure.getDuration().equals("")) {
+                BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, 
contextRuntimeVarMap, entityId,
+                        metadataProvider.getTxnIdFactory(), appCtx, listener, 
(QueryTranslator) statementExecutor);
 
-                //Add the Asterix Transaction Id to the map
-                long newTxId = 
metadataProvider.getTxnIdFactory().create().getId();
-                
contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
-                        String.valueOf(newTxId).getBytes());
-                jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
-
-                boolean wait = 
Boolean.parseBoolean(metadataProvider.getConfig().get(
-                        ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
-                if (wait || listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(jobId);
-                }
-
-                if (listener.getType() == PrecompiledType.QUERY) {
-                    ResultReader resultReader =
-                            new ResultReader(listener.getResultDataset(), 
jobId, listener.getResultId());
-
-                    ResultUtil.printResults(appCtx, resultReader,
-                            ((QueryTranslator) 
statementExecutor).getSessionOutput(), new Stats(), null);
-                }
 
             } else {
-                ScheduledExecutorService ses = 
DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
-                        ChannelJobService.findPeriod(procedure.getDuration()), 
contextRuntimeVarMap, entityId,
-                        metadataProvider.getTxnIdFactory());
+                ScheduledExecutorService ses = 
BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+                        BADJobService.findPeriod(procedure.getDuration()), 
contextRuntimeVarMap, entityId,
+                        metadataProvider.getTxnIdFactory(), listener);
                 listener.storeDistributedInfo(deployedJobSpecId, ses, 
listener.getResultDataset(),
                         listener.getResultId());
             }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index 526e091..1e5e627 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -50,18 +50,20 @@
     public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
     public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
     public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
+    public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
     public static final ARecordType CHANNEL_RECORDTYPE = 
MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_CHANNEL,
             // FieldNames
             new String[] { BADConstants.DataverseName, 
BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
                     BADConstants.ResultsDatasetName, BADConstants.Function, 
BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES },
+                    BADConstants.FIELD_NAME_DEPENDENCIES, 
BADConstants.FIELD_NAME_BODY },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, 
BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), 
BuiltinType.ASTRING,
                     new AOrderedListType(new AOrderedListType(new 
AOrderedListType(BuiltinType.ASTRING, null), null),
-                            null) },
+                            null),
+                    BuiltinType.ASTRING },
             //IsOpen?
             true);
     //------------------------------------------ Broker 
----------------------------------------//
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index 5f7dad0..f49de6a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -37,6 +37,7 @@
     private final String subscriptionsDatasetName;
     private final String resultsDatasetName;
     private final String duration;
+    private final String body;
     private final FunctionSignature function;
     private final List<String> functionAsPath;
     /*
@@ -49,12 +50,13 @@
     private final List<List<List<String>>> dependencies;
 
     public Channel(String dataverseName, String channelName, String 
subscriptionsDataset, String resultsDataset,
-            FunctionSignature function, String duration, 
List<List<List<String>>> dependencies) {
+            FunctionSignature function, String duration, 
List<List<List<String>>> dependencies, String body) {
         this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
         this.subscriptionsDatasetName = subscriptionsDataset;
+        this.body = body;
         if (this.function.getNamespace() == null) {
             this.function.setNamespace(dataverseName);
         }
@@ -94,6 +96,10 @@
         return duration;
     }
 
+    public String getBody() {
+        return body;
+    }
+
     public List<String> getFunctionAsPath() {
         return functionAsPath;
     }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 14db134..b362192 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -123,11 +123,15 @@
 
         }
 
+        String channelBody =
+                ((AString) 
channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
+                        .getStringValue();
+
         FunctionSignature signature = new 
FunctionSignature(functionSignature.get(0), functionSignature.get(1),
                 Integer.parseInt(functionSignature.get(2)));
 
         channel = new Channel(dataverseName, channelName, subscriptionsName, 
resultsName, signature, duration,
-                dependencies);
+                dependencies, channelBody);
         return channel;
     }
 
@@ -217,6 +221,12 @@
         dependenciesListBuilder.write(fieldValue.getDataOutput(), true);
         
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX,
 fieldValue);
 
+        // write field 7
+        fieldValue.reset();
+        aString.setValue(channel.getBody());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, 
fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 070c148..a82abcf 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.bad.metadata;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActivityState;
@@ -31,14 +33,7 @@
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
 
 public class DeployedJobSpecEventListener implements 
IActiveEntityEventsListener {
 
@@ -47,15 +42,17 @@
 
     public enum PrecompiledType {
         CHANNEL,
+        PUSH_CHANNEL,
         QUERY,
         INSERT,
         DELETE
     }
 
-    enum RequestState {
-        INIT,
-        STARTED,
-        FINISHED
+    public enum InstanceChange {
+        LOCK,
+        INCREASE,
+        DECREASE,
+        UNLOCK
     }
 
     private DeployedJobSpecId deployedJobSpecId;
@@ -64,17 +61,15 @@
 
     private IHyracksDataset hdc;
     private ResultSetId resultSetId;
+    private int instanceCount;
 
     // members
     protected volatile ActivityState state;
-    protected JobId jobId;
-    protected final List<IActiveEntityEventSubscriber> subscribers = new 
ArrayList<>();
     protected final ICcApplicationContext appCtx;
     protected final EntityId entityId;
     protected final ActiveEvent statsUpdatedEvent;
     protected long statsTimestamp;
     protected String stats;
-    protected RequestState statsRequestState;
     protected final String runtimeName;
     protected final AlgebricksAbsolutePartitionConstraint locations;
     private int runningInstance;
@@ -85,7 +80,7 @@
         this.entityId = entityId;
         this.state = ActivityState.STOPPED;
         this.statsTimestamp = -1;
-        this.statsRequestState = RequestState.INIT;
+        this.instanceCount = 0;
         this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, 
entityId, null);
         this.stats = "{\"Stats\":\"N/A\"}";
         this.runtimeName = runtimeName;
@@ -122,10 +117,6 @@
         return false;
     }
 
-    public JobId getJobId() {
-        return jobId;
-    }
-
     @Override
     public String getStats() {
         return stats;
@@ -136,38 +127,28 @@
         return statsTimestamp;
     }
 
-    public String formatStats(List<String> responses) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append("{\"Stats\": [").append(responses.get(0));
-        for (int i = 1; i < responses.size(); i++) {
-            strBuilder.append(", ").append(responses.get(i));
-        }
-        strBuilder.append("]}");
-        return strBuilder.toString();
-    }
-
-    protected synchronized void notifySubscribers(ActiveEvent event) {
-        notifyAll();
-        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
-        while (it.hasNext()) {
-            IActiveEntityEventSubscriber subscriber = it.next();
-            if (subscriber.isDone()) {
-                it.remove();
-            } else {
-                try {
-                    subscriber.notify(event);
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
+    public synchronized boolean setInstanceCount(InstanceChange change) {
+        switch (change) {
+            case INCREASE:
+                if (instanceCount < 0) {
+                    return false;
                 }
-                if (subscriber.isDone()) {
-                    it.remove();
+                instanceCount++;
+                break;
+            case DECREASE:
+                instanceCount--;
+                break;
+            case LOCK:
+                if (instanceCount != 0) {
+                    return false;
                 }
-            }
+                instanceCount = -1;
+                break;
+            case UNLOCK:
+                instanceCount = 0;
+                break;
         }
-    }
-
-    public AlgebricksAbsolutePartitionConstraint getLocations() {
-        return locations;
+        return true;
     }
 
     public PrecompiledType getType() {
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
new file mode 100644
index 0000000..819d052
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  location: circle,
+  userName: string
+};
+
+
+create type EmergencyReport as {
+  reportId: uuid,
+  Etype: string,
+  location: circle
+};
+
+
+create type EmergencyShelter as {
+  shelterName: string,
+  location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index u_location on UserLocations(location) type RTREE;
+
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  select report, shelters from
+   ( select value r from Reports r)report,
+  UserLocations u
+    let shelters = (select s.location from Shelters s where 
spatial_intersect(s.location,u.location))
+  where u.userName = userName
+  and spatial_intersect(report.location,u.location)
+  )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 
period duration("PT10S");
+
+create broker brokerA at "http://www.notifyA.com";;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
new file mode 100644
index 0000000..0a38e41
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : channels Jacobs
+*/
+
+use channels;
+
+insert into EmergencyChannelSubscriptions(
+[
+{"param0" : "w2294u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4321u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3398u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2488u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3666u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4489u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p78u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p544u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p711u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t2828u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4796u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4082u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4923u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2324u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1339u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p520u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1092u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4979u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1487u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4330u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3682u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p117u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "w1741u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2434u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3833u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1373u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p89u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4003u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c910u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4961u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4475u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w1960u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p438u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1362u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p588u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c902u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4684u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1609u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1510u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3851u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1418u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2559u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w1815u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4924u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3320u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p663u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4571u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p781u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c919u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1121u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p814u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4006u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2822u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4953u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3486u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3107u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2836u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2003u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3256u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4762u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4900u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p357u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t3630u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3166u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4687u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p817u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4433u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3426u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p582u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t3388u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4823u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1664u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4051u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c857u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1412u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2521u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3114u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p404u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p111u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t3006u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2903u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2823u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4153u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2589u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1459u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p766u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p593u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p168u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4253u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4177u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p387u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t2571u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1513u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p618u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t2735u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4859u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w1848u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3306u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2558u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p180u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
}
+]
+);
+
+insert into Shelters (
+[
+{"shelterName" : "s5064" , "location" : point("2437.34,1330.59") },
+{"shelterName" : "s5180" , "location" : point("2479.66,939.05") },
+{"shelterName" : "s5025" , "location" : point("3343.08,1139.29") },
+{"shelterName" : "s5218" , "location" : point("1361.85,795.99") },
+{"shelterName" : "s5199" , "location" : point("3619.88,1245.75") },
+{"shelterName" : "s5167" , "location" : point("2836.86,237.96") },
+{"shelterName" : "s5068" , "location" : point("2580.27,881.71") },
+{"shelterName" : "s5026" , "location" : point("2040.75,816.3") },
+{"shelterName" : "s5080" , "location" : point("2785.14,1090.68") },
+{"shelterName" : "s5190" , "location" : point("1296.09,1256.87") },
+{"shelterName" : "s5126" , "location" : point("1121.37,1478.3") },
+{"shelterName" : "s5075" , "location" : point("2679.51,488.23") },
+{"shelterName" : "s5187" , "location" : point("2544.14,2464.15") },
+{"shelterName" : "s5061" , "location" : point("1674.35,1265.46") },
+{"shelterName" : "s5057" , "location" : point("1677.96,1151.83") },
+{"shelterName" : "s5149" , "location" : point("2236.78,1068.51") },
+{"shelterName" : "s5058" , "location" : point("2919.37,2055.98") },
+{"shelterName" : "s5144" , "location" : point("731.17,1504.74") },
+{"shelterName" : "s5099" , "location" : point("2594.22,1078.11") },
+{"shelterName" : "s5030" , "location" : point("3678.46,978.38") },
+{"shelterName" : "s5050" , "location" : point("2395.61,1332.9") },
+{"shelterName" : "s5209" , "location" : point("3093.56,1833.18") },
+{"shelterName" : "s5052" , "location" : point("3052.36,1076.89") },
+{"shelterName" : "s5208" , "location" : point("1513.18,796.41") },
+{"shelterName" : "s5027" , "location" : point("671.97,2004.72") },
+{"shelterName" : "s5093" , "location" : point("2297.68,1933.79") },
+{"shelterName" : "s5202" , "location" : point("2854.73,1153.14") },
+{"shelterName" : "s5106" , "location" : point("1971.88,737.51") },
+{"shelterName" : "s5192" , "location" : point("2638.68,1688.7") },
+{"shelterName" : "s5128" , "location" : point("1970.48,2569.09") },
+{"shelterName" : "s5098" , "location" : point("2596.02,1499.58") },
+{"shelterName" : "s5038" , "location" : point("2297.07,995.53") },
+{"shelterName" : "s5176" , "location" : point("1570.34,1701.0") },
+{"shelterName" : "s5066" , "location" : point("820.41,2092.07") },
+{"shelterName" : "s5108" , "location" : point("459.09,395.34") },
+{"shelterName" : "s5162" , "location" : point("2543.99,1822.57") },
+{"shelterName" : "s5195" , "location" : point("3190.14,979.74") },
+{"shelterName" : "s5212" , "location" : point("2478.36,921.15") },
+{"shelterName" : "s5217" , "location" : point("2411.71,807.77") },
+{"shelterName" : "s5049" , "location" : point("447.77,1527.37") },
+{"shelterName" : "s5163" , "location" : point("2356.91,1221.63") },
+{"shelterName" : "s5048" , "location" : point("1495.47,1597.15") },
+{"shelterName" : "s5154" , "location" : point("1503.31,884.66") },
+{"shelterName" : "s5116" , "location" : point("1359.98,314.68") },
+{"shelterName" : "s5143" , "location" : point("3000.77,1080.81") },
+{"shelterName" : "s5073" , "location" : point("2404.16,1516.85") },
+{"shelterName" : "s5032" , "location" : point("2370.75,1879.99") },
+{"shelterName" : "s5152" , "location" : point("1091.19,706.09") },
+{"shelterName" : "s5044" , "location" : point("2395.55,1378.92") },
+{"shelterName" : "s5081" , "location" : point("2057.88,838.33") },
+{"shelterName" : "s5130" , "location" : point("1511.27,1082.85") },
+{"shelterName" : "s5142" , "location" : point("2634.22,1631.85") },
+{"shelterName" : "s5071" , "location" : point("1799.13,2130.04") },
+{"shelterName" : "s5051" , "location" : point("2414.52,807.3") },
+{"shelterName" : "s5147" , "location" : point("1960.71,976.11") },
+{"shelterName" : "s5021" , "location" : point("2659.55,1957.04") },
+{"shelterName" : "s5139" , "location" : point("1863.26,1216.29") },
+{"shelterName" : "s5062" , "location" : point("937.48,1608.88") },
+{"shelterName" : "s5168" , "location" : point("2245.27,1012.57") },
+{"shelterName" : "s5196" , "location" : point("2341.23,1883.0") },
+{"shelterName" : "s5119" , "location" : point("1702.71,2557.47") },
+{"shelterName" : "s5150" , "location" : point("1613.07,1804.66") },
+{"shelterName" : "s5171" , "location" : point("2789.6,931.73") },
+{"shelterName" : "s5102" , "location" : point("3483.61,1156.18") },
+{"shelterName" : "s5091" , "location" : point("1081.51,1596.95") },
+{"shelterName" : "s5132" , "location" : point("2099.11,1086.28") },
+{"shelterName" : "s5104" , "location" : point("2896.78,461.73") },
+{"shelterName" : "s5022" , "location" : point("1920.77,1657.89") },
+{"shelterName" : "s5219" , "location" : point("2913.07,954.89") },
+{"shelterName" : "s5088" , "location" : point("1831.54,2241.22") },
+{"shelterName" : "s5166" , "location" : point("1087.15,2048.13") },
+{"shelterName" : "s5203" , "location" : point("2543.5,1815.38") },
+{"shelterName" : "s5136" , "location" : point("2503.29,1270.07") },
+{"shelterName" : "s5194" , "location" : point("1595.1,1634.0") },
+{"shelterName" : "s5060" , "location" : point("2230.74,818.1") },
+{"shelterName" : "s5127" , "location" : point("2567.07,1104.86") },
+{"shelterName" : "s5092" , "location" : point("1732.18,1170.23") },
+{"shelterName" : "s5124" , "location" : point("2456.58,983.76") },
+{"shelterName" : "s5201" , "location" : point("1875.2,1300.76") },
+{"shelterName" : "s5029" , "location" : point("2581.92,690.14") },
+{"shelterName" : "s5146" , "location" : point("2437.06,2491.18") },
+{"shelterName" : "s5074" , "location" : point("1761.92,2035.44") },
+{"shelterName" : "s5173" , "location" : point("1000.01,1488.16") },
+{"shelterName" : "s5039" , "location" : point("2604.75,630.95") },
+{"shelterName" : "s5020" , "location" : point("1920.62,670.07") },
+{"shelterName" : "s5120" , "location" : point("1562.27,1045.02") },
+{"shelterName" : "s5083" , "location" : point("964.22,1606.66") },
+{"shelterName" : "s5122" , "location" : point("2253.13,1556.55") },
+{"shelterName" : "s5103" , "location" : point("2023.99,2505.02") },
+{"shelterName" : "s5155" , "location" : point("2996.58,390.66") },
+{"shelterName" : "s5076" , "location" : point("1025.57,515.66") },
+{"shelterName" : "s5086" , "location" : point("2384.13,886.5") },
+{"shelterName" : "s5053" , "location" : point("1173.98,2173.29") },
+{"shelterName" : "s5216" , "location" : point("2865.64,1182.0") },
+{"shelterName" : "s5065" , "location" : point("2633.42,574.61") },
+{"shelterName" : "s5055" , "location" : point("1236.87,876.11") },
+{"shelterName" : "s5215" , "location" : point("2272.8,1115.83") },
+{"shelterName" : "s5210" , "location" : point("1314.22,729.82") },
+{"shelterName" : "s5200" , "location" : point("1776.8,1176.29") },
+{"shelterName" : "s5165" , "location" : point("3485.44,980.34") },
+{"shelterName" : "s5042" , "location" : point("1812.4,1252.84") }
+]
+);
+
+insert into UserLocations (
+[
+{"userName" : "w2294u1" , "location" : circle("2683.3,480.84 100.0")},
+{"userName" : "t4321u1" , "location" : circle("1990.77,754.24 100.0")},
+{"userName" : "t3398u1" , "location" : circle("2791.2,962.92 100.0")},
+{"userName" : "w2488u1" , "location" : circle("2040.19,767.35 100.0")},
+{"userName" : "t3666u1" , "location" : circle("3968.68,1308.04 100.0")},
+{"userName" : "t4489u1" , "location" : circle("1713.82,252.6 100.0")},
+{"userName" : "p78u1" , "location" : circle("2588.34,735.72 100.0")},
+{"userName" : "p544u1" , "location" : circle("1993.69,1465.29 100.0")},
+{"userName" : "p711u1" , "location" : circle("3221.64,1062.22 100.0")},
+{"userName" : "t2828u1" , "location" : circle("2534.49,978.95 100.0")},
+{"userName" : "t4796u1" , "location" : circle("1752.58,323.55 100.0")},
+{"userName" : "t4082u1" , "location" : circle("3571.17,1213.78 100.0")},
+{"userName" : "t4923u1" , "location" : circle("1878.19,587.26 100.0")},
+{"userName" : "w2324u1" , "location" : circle("2851.94,1429.02 100.0")},
+{"userName" : "c1339u1" , "location" : circle("2881.61,956.95 100.0")},
+{"userName" : "p520u1" , "location" : circle("2927.9,986.41 100.0")},
+{"userName" : "c1092u1" , "location" : circle("1670.02,1019.43 100.0")},
+{"userName" : "t4979u1" , "location" : circle("1856.46,543.47 100.0")},
+{"userName" : "c1487u1" , "location" : circle("1757.21,745.68 100.0")},
+{"userName" : "t4330u1" , "location" : circle("1596.34,59.37 100.0")},
+{"userName" : "t3682u1" , "location" : circle("3557.22,1204.63 100.0")},
+{"userName" : "p117u1" , "location" : circle("1717.46,1166.14 100.0")},
+{"userName" : "w1741u1" , "location" : circle("2517.33,980.01 100.0")},
+{"userName" : "w2434u1" , "location" : circle("527.29,1520.52 100.0")},
+{"userName" : "t3833u1" , "location" : circle("3521.17,1180.97 100.0")},
+{"userName" : "c1373u1" , "location" : circle("1733.98,742.36 100.0")},
+{"userName" : "p89u1" , "location" : circle("3768.73,1013.03 100.0")},
+{"userName" : "t4003u1" , "location" : circle("2945.36,952.79 100.0")},
+{"userName" : "c910u1" , "location" : circle("1124.86,1275.91 100.0")},
+{"userName" : "t4961u1" , "location" : circle("1575.25,0.0 100.0")},
+{"userName" : "t4475u1" , "location" : circle("1607.59,79.94 100.0")},
+{"userName" : "w1960u1" , "location" : circle("1768.23,2263.45 100.0")},
+{"userName" : "p438u1" , "location" : circle("898.13,238.92 100.0")},
+{"userName" : "c1362u1" , "location" : circle("1438.64,400.67 100.0")},
+{"userName" : "p588u1" , "location" : circle("3206.01,1052.3 100.0")},
+{"userName" : "c902u1" , "location" : circle("1615.16,1251.39 100.0")},
+{"userName" : "t4684u1" , "location" : circle("1876.6,584.06 100.0")},
+{"userName" : "c1609u1" , "location" : circle("2320.02,725.01 100.0")},
+{"userName" : "c1510u1" , "location" : circle("2256.6,816.78 100.0")},
+{"userName" : "t3851u1" , "location" : circle("3807.01,1304.1 100.0")},
+{"userName" : "c1418u1" , "location" : circle("2273.25,815.93 100.0")},
+{"userName" : "t2559u1" , "location" : circle("2297.49,1887.46 100.0")},
+{"userName" : "w1815u1" , "location" : circle("1539.89,343.08 100.0")},
+{"userName" : "t4924u1" , "location" : circle("1609.65,83.72 100.0")},
+{"userName" : "t3320u1" , "location" : circle("2322.45,1277.89 100.0")},
+{"userName" : "p663u1" , "location" : circle("2291.38,254.83 100.0")},
+{"userName" : "t4571u1" , "location" : circle("2253.78,1090.84 100.0")},
+{"userName" : "p781u1" , "location" : circle("1154.4,712.8 100.0")},
+{"userName" : "c919u1" , "location" : circle("1721.46,485.13 100.0")},
+{"userName" : "c1121u1" , "location" : circle("4171.58,1083.41 100.0")},
+{"userName" : "p814u1" , "location" : circle("2176.13,990.55 100.0")},
+{"userName" : "t4006u1" , "location" : circle("3977.9,1303.22 100.0")},
+{"userName" : "t2822u1" , "location" : circle("3087.51,1849.01 100.0")},
+{"userName" : "t4953u1" , "location" : circle("1923.91,676.56 100.0")},
+{"userName" : "t3486u1" , "location" : circle("3832.72,1320.87 100.0")},
+{"userName" : "t3107u1" , "location" : circle("2994.64,2023.92 100.0")},
+{"userName" : "t2836u1" , "location" : circle("2307.27,2096.07 100.0")},
+{"userName" : "w2003u1" , "location" : circle("1976.08,2279.1 100.0")},
+{"userName" : "t3256u1" , "location" : circle("2161.32,1700.73 100.0")},
+{"userName" : "t4762u1" , "location" : circle("1916.88,662.71 100.0")},
+{"userName" : "t4900u1" , "location" : circle("2253.78,1090.84 100.0")},
+{"userName" : "p357u1" , "location" : circle("1699.66,1976.29 100.0")},
+{"userName" : "t3630u1" , "location" : circle("3778.3,1285.37 100.0")},
+{"userName" : "t3166u1" , "location" : circle("2743.85,965.95 100.0")},
+{"userName" : "t4687u1" , "location" : circle("1798.2,426.06 100.0")},
+{"userName" : "p817u1" , "location" : circle("1446.92,909.48 100.0")},
+{"userName" : "t4433u1" , "location" : circle("1805.69,441.15 100.0")},
+{"userName" : "t3426u1" , "location" : circle("3055.08,945.53 100.0")},
+{"userName" : "p582u1" , "location" : circle("1523.47,739.18 100.0")},
+{"userName" : "t3388u1" , "location" : circle("2919.63,954.46 100.0")},
+{"userName" : "t4823u1" , "location" : circle("2174.09,987.9 100.0")},
+{"userName" : "c1664u1" , "location" : circle("1283.46,1099.95 100.0")},
+{"userName" : "t4051u1" , "location" : circle("4047.21,1215.22 100.0")},
+{"userName" : "c857u1" , "location" : circle("1270.33,664.46 100.0")},
+{"userName" : "c1412u1" , "location" : circle("1279.63,1731.15 100.0")},
+{"userName" : "t2521u1" , "location" : circle("2850.02,1040.64 100.0")},
+{"userName" : "t3114u1" , "location" : circle("2294.53,995.82 100.0")},
+{"userName" : "p404u1" , "location" : circle("3789.35,1022.15 100.0")},
+{"userName" : "p111u1" , "location" : circle("2054.96,904.61 100.0")},
+{"userName" : "t3006u1" , "location" : circle("3095.66,1781.54 100.0")},
+{"userName" : "t2903u1" , "location" : circle("2449.83,984.17 100.0")},
+{"userName" : "t2823u1" , "location" : circle("2116.91,1638.54 100.0")},
+{"userName" : "t4153u1" , "location" : circle("3977.9,1303.22 100.0")},
+{"userName" : "t2589u1" , "location" : circle("2075.58,1455.78 100.0")},
+{"userName" : "c1459u1" , "location" : circle("2149.15,1104.3 100.0")},
+{"userName" : "p766u1" , "location" : circle("3144.94,1042.37 100.0")},
+{"userName" : "p593u1" , "location" : circle("3460.96,1141.22 100.0")},
+{"userName" : "p168u1" , "location" : circle("1719.0,2688.66 100.0")},
+{"userName" : "t4253u1" , "location" : circle("1575.25,0.0 100.0")},
+{"userName" : "t4177u1" , "location" : circle("2185.18,1002.25 100.0")},
+{"userName" : "p387u1" , "location" : circle("2351.74,811.8 100.0")},
+{"userName" : "t2571u1" , "location" : circle("2307.49,1915.48 100.0")},
+{"userName" : "c1513u1" , "location" : circle("3432.64,1067.27 100.0")},
+{"userName" : "p618u1" , "location" : circle("1682.87,1962.44 100.0")},
+{"userName" : "t2735u1" , "location" : circle("2999.73,1635.76 100.0")},
+{"userName" : "t4859u1" , "location" : circle("1763.96,348.87 100.0")},
+{"userName" : "w1848u1" , "location" : circle("2675.62,2150.98 100.0")},
+{"userName" : "t3306u1" , "location" : circle("2274.48,1312.75 100.0")},
+{"userName" : "t2558u1" , "location" : circle("2254.43,2174.64 100.0")},
+{"userName" : "p180u1" , "location" : circle("1253.66,1771.06 100.0")}
+]
+);
+
+insert into Reports
+(
+[
+{"Etype" : "flood" , "location" : circle("846.5,2589.56 1000.0")},
+{"Etype" : "crash" , "location" : circle("953.48,2504.12 100.0")},
+{"Etype" : "flood" , "location" : circle("2313.19,1641.15 1000.0")},
+{"Etype" : "fire" , "location" : circle("3014.66,2332.34 500.0")},
+{"Etype" : "flood" , "location" : circle("1188.75,2307.52 1000.0")},
+{"Etype" : "fire" , "location" : circle("3418.08,1090.2 500.0")},
+{"Etype" : "fire" , "location" : circle("1364.9,1434.81 500.0")},
+{"Etype" : "storm" , "location" : circle("1164.65,2088.5 2000.0")},
+{"Etype" : "flood" , "location" : circle("3582.04,974.89 1000.0")},
+{"Etype" : "flood" , "location" : circle("1016.15,846.6 1000.0")},
+{"Etype" : "flood" , "location" : circle("1416.4,1483.13 1000.0")},
+{"Etype" : "flood" , "location" : circle("2777.23,963.83 1000.0")},
+{"Etype" : "fire" , "location" : circle("3082.74,1746.62 500.0")},
+{"Etype" : "storm" , "location" : circle("1186.15,2283.96 2000.0")},
+{"Etype" : "crash" , "location" : circle("1218.17,1591.68 100.0")},
+{"Etype" : "fire" , "location" : circle("1141.24,1474.73 500.0")},
+{"Etype" : "flood" , "location" : circle("1105.73,1875.44 1000.0")},
+{"Etype" : "flood" , "location" : circle("1805.37,2346.03 1000.0")},
+{"Etype" : "flood" , "location" : circle("1535.83,1546.66 1000.0")},
+{"Etype" : "fire" , "location" : circle("4187.75,1064.17 500.0")}
+]
+);
+
+insert into Reports
+(
+[
+{"Etype" : "flood" , "location" : circle("846.5,2589.56 1000.0")},
+{"Etype" : "crash" , "location" : circle("953.48,2504.12 100.0")},
+{"Etype" : "flood" , "location" : circle("2313.19,1641.15 1000.0")},
+{"Etype" : "fire" , "location" : circle("3014.66,2332.34 500.0")},
+{"Etype" : "flood" , "location" : circle("1188.75,2307.52 1000.0")},
+{"Etype" : "fire" , "location" : circle("3418.08,1090.2 500.0")},
+{"Etype" : "fire" , "location" : circle("1364.9,1434.81 500.0")},
+{"Etype" : "storm" , "location" : circle("1164.65,2088.5 2000.0")},
+{"Etype" : "flood" , "location" : circle("3582.04,974.89 1000.0")},
+{"Etype" : "flood" , "location" : circle("1016.15,846.6 1000.0")},
+{"Etype" : "flood" , "location" : circle("1416.4,1483.13 1000.0")},
+{"Etype" : "flood" , "location" : circle("2777.23,963.83 1000.0")},
+{"Etype" : "fire" , "location" : circle("3082.74,1746.62 500.0")},
+{"Etype" : "storm" , "location" : circle("1186.15,2283.96 2000.0")},
+{"Etype" : "crash" , "location" : circle("1218.17,1591.68 100.0")},
+{"Etype" : "fire" , "location" : circle("1141.24,1474.73 500.0")},
+{"Etype" : "flood" , "location" : circle("1105.73,1875.44 1000.0")},
+{"Etype" : "flood" , "location" : circle("1805.37,2346.03 1000.0")},
+{"Etype" : "flood" , "location" : circle("1535.83,1546.66 1000.0")},
+{"Etype" : "fire" , "location" : circle("4187.75,1064.17 500.0")}
+]
+);
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.sleep.sqlpp
new file mode 100644
index 0000000..5f764b3
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.sleep.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+15000
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.update.sqlpp
new file mode 100644
index 0000000..bdcd4fe
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+create index s_location on Shelters(location) type RTREE;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.sleep.sqlpp
new file mode 100644
index 0000000..9aef8ec
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.sleep.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+10000
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.query.sqlpp
new file mode 100644
index 0000000..fc3dc86
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+select value array_count((select * from EmergencyChannelResults ));
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.7.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.7.ddl.sqlpp
new file mode 100644
index 0000000..54075e5
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.7.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+drop channel EmergencyChannel;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
new file mode 100644
index 0000000..ca01dd7
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Drop Function Dataset Index
+* Expected Res : Error
+* Date         : Jan 2018
+* Author       : Steven Jacobs
+*/
+
+drop dataverse two if exists;
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  location: circle,
+  userName: string,
+  timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+  location: circle,
+  userName: string
+};
+
+create type EmergencyReport as {
+  reportId: uuid,
+  Etype: string,
+  location: circle,
+  timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+  Etype: string,
+  location: circle
+};
+
+
+create type EmergencyShelter as {
+  shelterName: string,
+  location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  select report, shelters from
+   ( select value r from Reports r where r.timeStamp >
+   current_datetime() - day_time_duration("PT10S"))report,
+  UserLocations u
+    let shelters = (select s.location from Shelters s where 
spatial_intersect(s.location,u.location))
+  where u.userName = userName
+  and spatial_intersect(report.location,u.location)
+  )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 
period duration("PT10S");
+
+use channels;
+drop index Shelters.s_location;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
new file mode 100644
index 0000000..2f6c289
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
@@ -0,0 +1 @@
+2148
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
index bee9157..225d83f 100644
--- 
a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -1 +1 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", 
"SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", 
"ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannelResults" ], [ "channels", 
"nearbyTweetChannelSubscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", 
"SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", 
"ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannelResults" ], [ "channels", 
"nearbyTweetChannelSubscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with 
\"false\";\ninsert into channels.nearbyTweetChannelResults as a (\nwith 
channelExecutionTime as current_datetime() \nselect result, 
channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() 
as deliveryTime\nfrom channels.nearbyTweetChannelSubscriptions 
sub,\nMetadata.Broker b, 
\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere 
b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) 
returning a;" }
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
index 1c492ac..8d8899d 100644
--- 
a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -1,2 +1,2 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", 
"SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel1Results" ], [ "channels", 
"nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ] }
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", 
"SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel3Results" ], [ "channels", 
"nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", 
"SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel1Results" ], [ "channels", 
"nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with 
\"false\";\ninsert into channels.nearbyTweetChannel1Results as a (\nwith 
channelExecutionTime as current_datetime() \nselect result, 
channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() 
as deliveryTime\nfrom channels.nearbyTweetChannel1Subscriptions 
sub,\nMetadata.Broker b, 
\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere 
b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) 
returning a;" }
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", 
"SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel3Results" ], [ "channels", 
"nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with 
\"false\";\ninsert into channels.nearbyTweetChannel3Results as a (\nwith 
channelExecutionTime as current_datetime() \nselect result, 
channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() 
as deliveryTime\nfrom channels.nearbyTweetChannel3Subscriptions 
sub,\nMetadata.Broker b, 
\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere 
b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) 
returning a;" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml 
b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 3c72a14..4640af1 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -135,6 +135,17 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
+      <compilation-unit name="drop_index">
+        <output-dir compare="Text">drop_index</output-dir>
+        <expected-error>Cannot drop index. channels.EmergencyChannel(Channel) 
depends on it!</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
+      <compilation-unit name="add_index">
+        <output-dir compare="Text">add_index</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
       <compilation-unit name="drop_results">
         <output-dir compare="Text">drop_results</output-dir>
         <expected-error>Cannot alter dataset two.nearbyTweetChannelResults. 
two.nearbyTweetChannel(Channel) depends on it!</expected-error>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2620
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>

Reply via email to