abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1688

Change subject: Remove use of static ctx
......................................................................

Remove use of static ctx

Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
---
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
11 files changed, 92 insertions(+), 73 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad 
refs/changes/88/1688/1

diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 2a11e13..0360ee3 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.translator.SessionConfig;
@@ -30,9 +31,9 @@
 public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory 
{
 
     @Override
-    public QueryTranslator create(List<Statement> statements, SessionConfig 
conf,
+    public QueryTranslator create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionConfig conf,
             ILangCompilationProvider compilationProvider, 
IStorageComponentProvider storageComponentProvider) {
-        return new BADStatementExecutor(statements, conf, compilationProvider, 
storageComponentProvider,
+        return new BADStatementExecutor(appCtx, statements, conf, 
compilationProvider, storageComponentProvider,
                 executorService);
     }
 }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 7e45fd6..8d4b1e5 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
 import org.apache.asterix.compiler.provider.IRuleSetFactory;
 import org.apache.asterix.optimizer.base.RuleCollections;
@@ -35,11 +36,12 @@
 public class BADRuleSetFactory implements IRuleSetFactory {
 
     @Override
-    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> 
getLogicalRewrites()
-            throws AlgebricksException {
-        List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> 
logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> 
getLogicalRewrites(
+            ICcApplicationContext appCtx) throws AlgebricksException {
+        List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> 
logicalRuleSet =
+                DefaultRuleSetFactory.buildLogical(appCtx);
 
-        List<IAlgebraicRewriteRule> normalizationCollection = 
RuleCollections.buildNormalizationRuleCollection();
+        List<IAlgebraicRewriteRule> normalizationCollection = 
RuleCollections.buildNormalizationRuleCollection(appCtx);
         List<IAlgebraicRewriteRule> alteredNormalizationCollection = new 
ArrayList<>();
         alteredNormalizationCollection.addAll(normalizationCollection);
 
@@ -54,7 +56,7 @@
 
         //Find instances of the normalization collection and replace them with 
the new one
         SequentialOnceRuleController seqOnceCtrl = new 
SequentialOnceRuleController(true);
-        for (int i =0; i < logicalRuleSet.size(); i++){
+        for (int i = 0; i < logicalRuleSet.size(); i++) {
             List<IAlgebraicRewriteRule> collection = 
logicalRuleSet.get(i).second;
             if (collection.size() == normalizationCollection.size()) {
                 boolean isNormalizationCollection = true;
@@ -75,7 +77,8 @@
     }
 
     @Override
-    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> 
getPhysicalRewrites() {
-        return DefaultRuleSetFactory.buildPhysical();
+    public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> 
getPhysicalRewrites(
+            ICcApplicationContext appCtx) {
+        return DefaultRuleSetFactory.buildPhysical(appCtx);
     }
 }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 5cc6444..a6fe5a8 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
@@ -42,12 +43,11 @@
 
 public class BADStatementExecutor extends QueryTranslator {
 
-    public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig 
conf,
+    public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> 
aqlStatements, SessionConfig conf,
             ILangCompilationProvider compliationProvider, 
IStorageComponentProvider storageComponentProvider,
             ExecutorService executorService) {
-        super(aqlStatements, conf, compliationProvider, 
storageComponentProvider, executorService);
+        super(appCtx, aqlStatements, conf, compliationProvider, 
storageComponentProvider, executorService);
     }
-
 
     @Override
     protected void handleDataverseDropStatement(MetadataProvider 
metadataProvider, Statement stmt,
@@ -59,7 +59,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
         List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, 
dvId.getValue());
-        MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getDefaultDataverse(),
+        MetadataProvider tempMdProvider = new MetadataProvider(appCtx, 
metadataProvider.getDefaultDataverse(),
                 metadataProvider.getStorageComponentProvider());
         tempMdProvider.setConfig(metadataProvider.getConfig());
         for (Broker broker : brokers) {
@@ -70,8 +70,8 @@
         List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, 
dvId.getValue());
         for (Channel channel : channels) {
             tempMdProvider.getLocks().reset();
-            ChannelDropStatement drop = new ChannelDropStatement(dvId,
-                    new Identifier(channel.getChannelId().getEntityName()), 
false);
+            ChannelDropStatement drop =
+                    new ChannelDropStatement(dvId, new 
Identifier(channel.getChannelId().getEntityName()), false);
             drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
         }
         List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, 
dvId.getValue());
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 854ae07..1b655da 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.bad.lang.statement;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
@@ -85,12 +87,14 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException {
-
         String dataverse = ((QueryTranslator) 
statementExecutor).getActiveDataverse(dataverseName);
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
dataverse, channelName.getValue());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) 
activeEventHandler.getActiveEntityListener(entityId);
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -111,13 +115,13 @@
             listener.getExecutorService().shutdownNow();
             JobId hyracksJobId = listener.getJobId();
             listener.deActivate();
-            ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+            activeEventHandler.removeListener(listener);
             if (hyracksJobId != null) {
                 hcc.destroyJob(hyracksJobId);
             }
 
             //Create a metadata provider to use in nested jobs.
-            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getDefaultDataverse(),
+            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, 
metadataProvider.getDefaultDataverse(),
                     metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Drop the Channel Datasets
@@ -130,7 +134,6 @@
             dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getSubscriptionsDataset()), true);
             ((QueryTranslator) 
statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
-
 
             //Remove the Channel Metadata
             MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 5c5cdf1..be5b794 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -131,8 +131,7 @@
             int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException {
 
         String dataverse = ((QueryTranslator) 
statementExecutor).getActiveDataverse(dataverseName);
-        String brokerDataverse = ((QueryTranslator) statementExecutor)
-.getActiveDataverse(brokerDataverseName);
+        String brokerDataverse = ((QueryTranslator) 
statementExecutor).getActiveDataverse(brokerDataverseName);
 
         MetadataTransactionContext mdTxnCtx = null;
         try {
@@ -156,7 +155,7 @@
 
             Query subscriptionTuple = new Query(false);
 
-            List<FieldBinding> fb = new ArrayList<FieldBinding>();
+            List<FieldBinding> fb = new ArrayList<>();
             LiteralExpr leftExpr = new LiteralExpr(new 
StringLiteral(BADConstants.DataverseName));
             Expression rightExpr = new LiteralExpr(new 
StringLiteral(brokerDataverse));
             fb.add(new FieldBinding(leftExpr, rightExpr));
@@ -168,11 +167,11 @@
             if (subscriptionId != null) {
                 leftExpr = new LiteralExpr(new 
StringLiteral(BADConstants.SubscriptionId));
 
-                List<Expression> UUIDList = new ArrayList<Expression>();
+                List<Expression> UUIDList = new ArrayList<>();
                 UUIDList.add(new LiteralExpr(new 
StringLiteral(subscriptionId)));
                 FunctionIdentifier function = 
BuiltinFunctions.UUID_CONSTRUCTOR;
-                FunctionSignature UUIDfunc = new 
FunctionSignature(function.getNamespace(), function.getName(),
-                        function.getArity());
+                FunctionSignature UUIDfunc =
+                        new FunctionSignature(function.getNamespace(), 
function.getName(), function.getArity());
                 CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
 
                 rightExpr = UUIDCall;
@@ -189,8 +188,8 @@
 
             subscriptionTuple.setVarCounter(varCounter);
 
-            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), 
metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
 
             if (subscriptionId == null) {
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 538a5ea..667711c 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -129,12 +129,12 @@
             condition.setCurrentop(true);
             condition.addOperator("=");
 
-            List<Expression> UUIDList = new ArrayList<Expression>();
+            List<Expression> UUIDList = new ArrayList<>();
             UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
 
             FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
-            FunctionSignature UUIDfunc = new 
FunctionSignature(function.getNamespace(), function.getName(),
-                    function.getArity());
+            FunctionSignature UUIDfunc =
+                    new FunctionSignature(function.getNamespace(), 
function.getName(), function.getArity());
             CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
 
             condition.addOperand(UUIDCall);
@@ -143,8 +143,8 @@
                     new Identifier(subscriptionsDatasetName), condition, 
varCounter);
             AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
             delete.accept(visitor, null);
-            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), 
metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             ((QueryTranslator) 
statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index f138d4f..f0bc7ad 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -30,6 +30,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -40,6 +41,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import 
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -173,10 +175,10 @@
         Identifier subscriptionsTypeName = new 
Identifier(BADConstants.ChannelSubscriptionsType);
         Identifier resultsTypeName = new 
Identifier(BADConstants.ChannelResultsType);
         //Setup the subscriptions dataset
-        List<List<String>> partitionFields = new ArrayList<List<String>>();
-        List<Integer> keyIndicators = new ArrayList<Integer>();
+        List<List<String>> partitionFields = new ArrayList<>();
+        List<Integer> keyIndicators = new ArrayList<>();
         keyIndicators.add(0);
-        List<String> fieldNames = new ArrayList<String>();
+        List<String> fieldNames = new ArrayList<>();
         fieldNames.add(BADConstants.SubscriptionId);
         partitionFields.add(fieldNames);
         IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, 
keyIndicators, true, null, false);
@@ -185,8 +187,8 @@
                 new HashMap<String, String>(), new HashMap<String, String>(), 
DatasetType.INTERNAL, idd, true);
 
         //Setup the results dataset
-        partitionFields = new ArrayList<List<String>>();
-        fieldNames = new ArrayList<String>();
+        partitionFields = new ArrayList<>();
+        fieldNames = new ArrayList<>();
         fieldNames.add(BADConstants.ResultId);
         partitionFields.add(fieldNames);
         idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, 
null, false);
@@ -236,8 +238,7 @@
     }
 
     private void setupExecutorJob(EntityId entityId, JobSpecification 
channeljobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, boolean predistributed)
-            throws Exception {
+            PrecompiledJobEventListener listener, boolean predistributed) 
throws Exception {
         if (channeljobSpec != null) {
             //TODO: Find a way to fix optimizer tests so we don't need this 
check
             JobId jobId = null;
@@ -270,8 +271,11 @@
         Identifier subscriptionsName = new Identifier(channelName + 
BADConstants.subscriptionEnding);
         Identifier resultsName = new Identifier(channelName + 
BADConstants.resultsEnding);
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
dataverse, channelName.getValue());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) 
activeEventHandler.getActiveEntityListener(entityId);
         boolean alreadyActive = false;
         Channel channel = null;
 
@@ -300,8 +304,8 @@
             if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, 
resultsName.getValue()) != null) {
                 throw new AsterixException("The channel name:" + channelName + 
" is not available.");
             }
-            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), 
metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
             //Create Channel Datasets
             createDatasets(statementExecutor, subscriptionsName, resultsName, 
tempMdProvider, hcc, hdc, stats,
@@ -318,7 +322,7 @@
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, 
dataverse, resultsName.getValue()));
                 //TODO: Add datasets used by channel function
                 listener = new PrecompiledJobEventListener(entityId, 
PrecompiledType.CHANNEL, datasets);
-                
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+                activeEventHandler.registerListener(listener);
             }
 
             if (distributed) {
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index ae6886a..3c1ca2b 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -27,6 +27,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.result.ResultReader;
@@ -36,6 +37,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import 
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -87,7 +89,7 @@
             Expression period) {
         this.signature = signature;
         this.functionBody = functionBody;
-        this.paramList = new ArrayList<String>();
+        this.paramList = new ArrayList<>();
         for (VarIdentifier varId : parameterList) {
             this.paramList.add(varId.getValue());
         }
@@ -168,7 +170,8 @@
             throw new CompilationException("Procedure can only execute a 
single statement");
         }
         if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
-            return new Pair<>(((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider,
+            return new Pair<>(
+                    ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider,
                             fStatements.get(0), hcc, hdc, 
ResultDelivery.ASYNC, stats, true, null, null),
                     PrecompiledType.INSERT);
         } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
@@ -182,14 +185,14 @@
             fStatements.get(0).accept(visitor, null);
             return new Pair<>(((QueryTranslator) 
statementExecutor).handleDeleteStatement(metadataProvider,
                     fStatements.get(0), hcc, true), PrecompiledType.DELETE);
-        }else{
+        } else {
             throw new CompilationException("Procedure can only execute a 
single delete, insert, or query");
         }
     }
 
     private void setupDistributedJob(EntityId entityId, JobSpecification 
jobSpec, IHyracksClientConnection hcc,
             PrecompiledJobEventListener listener, ResultSetId resultSetId, 
IHyracksDataset hdc, Stats stats)
-                    throws Exception {
+            throws Exception {
         JobId jobId = hcc.distributeJob(jobSpec);
         listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, 
jobId, resultSetId));
     }
@@ -198,15 +201,15 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException {
-
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
         initialize();
-
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new 
Identifier(signature.getNamespace()));
-
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, 
dataverse, signature.getName());
         PrecompiledJobEventListener listener =
-                (PrecompiledJobEventListener) 
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+                (PrecompiledJobEventListener) 
activeEventHandler.getActiveEntityListener(entityId);
         boolean alreadyActive = false;
         Procedure procedure = null;
 
@@ -229,8 +232,8 @@
             procedure = new Procedure(dataverse, signature.getName(), 
signature.getArity(), getParamList(),
                     Function.RETURNTYPE_VOID, getFunctionBody(), 
Function.LANGUAGE_AQL, duration);
 
-            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
+            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getApplicationContext(),
+                    metadataProvider.getDefaultDataverse(), 
metadataProvider.getStorageComponentProvider());
             tempMdProvider.setConfig(metadataProvider.getConfig());
 
             metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
@@ -250,7 +253,7 @@
             if (listener == null) {
                 //TODO: Add datasets used by channel function
                 listener = new PrecompiledJobEventListener(entityId, 
procedureJobSpec.second, new ArrayList<>());
-                
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+                activeEventHandler.registerListener(listener);
             }
             setupDistributedJob(entityId, procedureJobSpec.first, hcc, 
listener, tempMdProvider.getResultSetId(), hdc,
                     stats);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 1a319a1..a8ec9aa 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.http.server.ResultUtil;
@@ -33,6 +34,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import 
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,21 +94,21 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException {
-
-
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
         String dataverse = ((QueryTranslator) 
statementExecutor).getActiveDataverse(new Identifier(dataverseName));
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, 
dataverse, procedureName);
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) 
activeEventHandler.getActiveEntityListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
-            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, 
procedureName,
-                    Integer.toString(getArity()));
+            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, 
procedureName, Integer.toString(getArity()));
             if (procedure == null) {
                 throw new AlgebricksException("There is no procedure with this 
name " + procedureName + ".");
             }
@@ -118,8 +120,8 @@
                 if (listener.getType() == PrecompiledType.QUERY) {
                     hcc.waitForCompletion(hyracksJobId);
                     ResultReader resultReader = listener.getResultReader();
-                    ResultUtil.printResults(resultReader, ((QueryTranslator) 
statementExecutor).getSessionConfig(),
-                            new Stats(), null);
+                    ResultUtil.printResults(appCtx, resultReader,
+                            ((QueryTranslator) 
statementExecutor).getSessionConfig(), new Stats(), null);
                 }
 
             } else {
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index abdf90a..f7c3a74 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.bad.lang.statement;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.struct.Identifier;
@@ -79,15 +81,17 @@
     public void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException {
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
         FunctionSignature signature = getFunctionSignature();
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new 
Identifier(signature.getNamespace()));
         signature.setNamespace(dataverse);
-
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, 
dataverse, signature.getName());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) 
activeEventHandler.getActiveEntityListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -111,7 +115,7 @@
             }
             JobId hyracksJobId = listener.getJobId();
             listener.deActivate();
-            ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+            activeEventHandler.removeListener(listener);
             if (hyracksJobId != null) {
                 hcc.destroyJob(hyracksJobId);
             }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index c5b7ef2..a246193 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -26,7 +26,7 @@
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INcApplicationContext;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -51,8 +51,8 @@
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
-    private final AOrderedListSerializerDeserializer subSerDes = new 
AOrderedListSerializerDeserializer(
-            new AOrderedListType(BuiltinType.AUUID, null));
+    private final AOrderedListSerializerDeserializer subSerDes =
+            new AOrderedListSerializerDeserializer(new 
AOrderedListType(BuiltinType.AUUID, null));
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -70,7 +70,7 @@
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
         eval1 = subEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
-        this.activeManager = (ActiveManager) ((IAppRuntimeContext) 
ctx.getJobletContext().getServiceContext()
+        this.activeManager = (ActiveManager) ((INcApplicationContext) 
ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1688
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to