Steven Jacobs has submitted this change and it was merged.
Change subject: Remove use of static ctx
......................................................................
Remove use of static ctx
Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
---
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
11 files changed, 92 insertions(+), 73 deletions(-)
Approvals:
Steven Jacobs: Looks good to me, approved
Jenkins: Verified
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 2a11e13..0360ee3 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -23,6 +23,7 @@
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.SessionConfig;
@@ -30,9 +31,9 @@
public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory
{
@Override
- public QueryTranslator create(List<Statement> statements, SessionConfig
conf,
+ public QueryTranslator create(ICcApplicationContext appCtx,
List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider) {
- return new BADStatementExecutor(statements, conf, compilationProvider,
storageComponentProvider,
+ return new BADStatementExecutor(appCtx, statements, conf,
compilationProvider, storageComponentProvider,
executorService);
}
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 7e45fd6..8d4b1e5 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
import org.apache.asterix.optimizer.base.RuleCollections;
@@ -35,11 +36,12 @@
public class BADRuleSetFactory implements IRuleSetFactory {
@Override
- public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>
getLogicalRewrites()
- throws AlgebricksException {
- List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>
logicalRuleSet = DefaultRuleSetFactory.buildLogical();
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>
getLogicalRewrites(
+ ICcApplicationContext appCtx) throws AlgebricksException {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>
logicalRuleSet =
+ DefaultRuleSetFactory.buildLogical(appCtx);
- List<IAlgebraicRewriteRule> normalizationCollection =
RuleCollections.buildNormalizationRuleCollection();
+ List<IAlgebraicRewriteRule> normalizationCollection =
RuleCollections.buildNormalizationRuleCollection(appCtx);
List<IAlgebraicRewriteRule> alteredNormalizationCollection = new
ArrayList<>();
alteredNormalizationCollection.addAll(normalizationCollection);
@@ -54,7 +56,7 @@
//Find instances of the normalization collection and replace them with
the new one
SequentialOnceRuleController seqOnceCtrl = new
SequentialOnceRuleController(true);
- for (int i =0; i < logicalRuleSet.size(); i++){
+ for (int i = 0; i < logicalRuleSet.size(); i++) {
List<IAlgebraicRewriteRule> collection =
logicalRuleSet.get(i).second;
if (collection.size() == normalizationCollection.size()) {
boolean isNormalizationCollection = true;
@@ -75,7 +77,8 @@
}
@Override
- public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>
getPhysicalRewrites() {
- return DefaultRuleSetFactory.buildPhysical();
+ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>
getPhysicalRewrites(
+ ICcApplicationContext appCtx) {
+ return DefaultRuleSetFactory.buildPhysical(appCtx);
}
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index f62a7e0..0f2e212 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -29,6 +29,7 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
@@ -42,12 +43,11 @@
public class BADStatementExecutor extends QueryTranslator {
- public BADStatementExecutor(List<Statement> statements, SessionConfig conf,
+ public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement>
statements, SessionConfig conf,
ILangCompilationProvider compliationProvider,
IStorageComponentProvider storageComponentProvider,
ExecutorService executorService) {
- super(statements, conf, compliationProvider, storageComponentProvider,
executorService);
+ super(appCtx, statements, conf, compliationProvider,
storageComponentProvider, executorService);
}
-
@Override
protected void handleDataverseDropStatement(MetadataProvider
metadataProvider, Statement stmt,
@@ -59,7 +59,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx,
dvId.getValue());
- MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getDefaultDataverse(),
+ MetadataProvider tempMdProvider = new MetadataProvider(appCtx,
metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
@@ -70,8 +70,8 @@
List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx,
dvId.getValue());
for (Channel channel : channels) {
tempMdProvider.getLocks().reset();
- ChannelDropStatement drop = new ChannelDropStatement(dvId,
- new Identifier(channel.getChannelId().getEntityName()),
false);
+ ChannelDropStatement drop =
+ new ChannelDropStatement(dvId, new
Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx,
dvId.getValue());
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 854ae07..1b655da 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -19,6 +19,7 @@
package org.apache.asterix.bad.lang.statement;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
@@ -85,12 +87,14 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
-
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverse, channelName.getValue());
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener)
activeEventHandler.getActiveEntityListener(entityId);
Channel channel = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -111,13 +115,13 @@
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+ activeEventHandler.removeListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
//Create a metadata provider to use in nested jobs.
- MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getDefaultDataverse(),
+ MetadataProvider tempMdProvider = new MetadataProvider(appCtx,
metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Drop the Channel Datasets
@@ -130,7 +134,6 @@
dropStmt = new DropDatasetStatement(new Identifier(dataverse),
new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator)
statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);
-
//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 305ca20..60f871e 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -128,8 +128,7 @@
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
- String brokerDataverse = ((QueryTranslator) statementExecutor)
-.getActiveDataverse(brokerDataverseName);
+ String brokerDataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(brokerDataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -153,7 +152,7 @@
Query subscriptionTuple = new Query(false);
- List<FieldBinding> fb = new ArrayList<FieldBinding>();
+ List<FieldBinding> fb = new ArrayList<>();
LiteralExpr leftExpr = new LiteralExpr(new
StringLiteral(BADConstants.DataverseName));
Expression rightExpr = new LiteralExpr(new
StringLiteral(brokerDataverse));
fb.add(new FieldBinding(leftExpr, rightExpr));
@@ -165,11 +164,11 @@
if (subscriptionId != null) {
leftExpr = new LiteralExpr(new
StringLiteral(BADConstants.SubscriptionId));
- List<Expression> UUIDList = new ArrayList<Expression>();
+ List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new
StringLiteral(subscriptionId)));
FunctionIdentifier function =
BuiltinFunctions.UUID_CONSTRUCTOR;
- FunctionSignature UUIDfunc = new
FunctionSignature(function.getNamespace(), function.getName(),
- function.getArity());
+ FunctionSignature UUIDfunc =
+ new FunctionSignature(function.getNamespace(),
function.getName(), function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
rightExpr = UUIDCall;
@@ -186,8 +185,8 @@
subscriptionTuple.setVarCounter(varCounter);
- MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
if (subscriptionId == null) {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 0cc96ad..60de69e 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -129,12 +129,12 @@
condition.setCurrentop(true);
condition.addOperator("=");
- List<Expression> UUIDList = new ArrayList<Expression>();
+ List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
- FunctionSignature UUIDfunc = new
FunctionSignature(function.getNamespace(), function.getName(),
- function.getArity());
+ FunctionSignature UUIDfunc =
+ new FunctionSignature(function.getNamespace(),
function.getName(), function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
condition.addOperand(UUIDCall);
@@ -143,8 +143,8 @@
new Identifier(subscriptionsDatasetName), condition,
varCounter);
SqlppDeleteRewriteVisitor visitor = new
SqlppDeleteRewriteVisitor();
delete.accept(visitor, null);
- MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
((QueryTranslator)
statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 19ea29d..571a2d7 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -30,6 +30,7 @@
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -41,6 +42,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -174,10 +176,10 @@
Identifier subscriptionsTypeName = new
Identifier(BADConstants.ChannelSubscriptionsType);
Identifier resultsTypeName = new
Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
- List<List<String>> partitionFields = new ArrayList<List<String>>();
- List<Integer> keyIndicators = new ArrayList<Integer>();
+ List<List<String>> partitionFields = new ArrayList<>();
+ List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
- List<String> fieldNames = new ArrayList<String>();
+ List<String> fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields,
keyIndicators, true, null, false);
@@ -186,8 +188,8 @@
new HashMap<String, String>(), new HashMap<String, String>(),
DatasetType.INTERNAL, idd, true);
//Setup the results dataset
- partitionFields = new ArrayList<List<String>>();
- fieldNames = new ArrayList<String>();
+ partitionFields = new ArrayList<>();
+ fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true,
null, false);
@@ -238,8 +240,7 @@
}
private void setupExecutorJob(EntityId entityId, JobSpecification
channeljobSpec, IHyracksClientConnection hcc,
- PrecompiledJobEventListener listener, boolean predistributed)
- throws Exception {
+ PrecompiledJobEventListener listener, boolean predistributed)
throws Exception {
if (channeljobSpec != null) {
//TODO: Find a way to fix optimizer tests so we don't need this
check
JobId jobId = null;
@@ -272,8 +273,11 @@
Identifier subscriptionsName = new Identifier(channelName +
BADConstants.subscriptionEnding);
Identifier resultsName = new Identifier(channelName +
BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverse, channelName.getValue());
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener)
activeEventHandler.getActiveEntityListener(entityId);
boolean alreadyActive = false;
Channel channel = null;
@@ -302,8 +306,8 @@
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse,
resultsName.getValue()) != null) {
throw new AsterixException("The channel name:" + channelName +
" is not available.");
}
- MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName,
tempMdProvider, hcc, hdc, stats,
@@ -320,7 +324,7 @@
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx,
dataverse, resultsName.getValue()));
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(entityId,
PrecompiledType.CHANNEL, datasets);
-
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ activeEventHandler.registerListener(listener);
}
if (distributed) {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index d378890..7373337 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -27,6 +27,7 @@
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.result.ResultReader;
@@ -37,6 +38,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -87,7 +89,7 @@
Expression period) {
this.signature = signature;
this.functionBody = functionBody;
- this.paramList = new ArrayList<String>();
+ this.paramList = new ArrayList<>();
for (VarIdentifier varId : parameterList) {
this.paramList.add(varId.getValue());
}
@@ -168,7 +170,8 @@
throw new CompilationException("Procedure can only execute a
single statement");
}
if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
- return new Pair<>(((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ return new Pair<>(
+ ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
fStatements.get(0), hcc, hdc,
ResultDelivery.ASYNC, stats, true, null, null),
PrecompiledType.INSERT);
} else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
@@ -182,14 +185,14 @@
fStatements.get(0).accept(visitor, null);
return new Pair<>(((QueryTranslator)
statementExecutor).handleDeleteStatement(metadataProvider,
fStatements.get(0), hcc, true), PrecompiledType.DELETE);
- }else{
+ } else {
throw new CompilationException("Procedure can only execute a
single delete, insert, or query");
}
}
private void setupDistributedJob(EntityId entityId, JobSpecification
jobSpec, IHyracksClientConnection hcc,
PrecompiledJobEventListener listener, ResultSetId resultSetId,
IHyracksDataset hdc, Stats stats)
- throws Exception {
+ throws Exception {
JobId jobId = hcc.distributeJob(jobSpec);
listener.storeDistributedInfo(jobId, null, new ResultReader(hdc,
jobId, resultSetId));
}
@@ -198,15 +201,15 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
-
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
initialize();
-
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new
Identifier(signature.getNamespace()));
-
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverse, signature.getName());
PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+ (PrecompiledJobEventListener)
activeEventHandler.getActiveEntityListener(entityId);
boolean alreadyActive = false;
Procedure procedure = null;
@@ -229,8 +232,8 @@
procedure = new Procedure(dataverse, signature.getName(),
signature.getArity(), getParamList(),
Function.RETURNTYPE_VOID, getFunctionBody(),
Function.LANGUAGE_AQL, duration);
- MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
+ metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
@@ -250,7 +253,7 @@
if (listener == null) {
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(entityId,
procedureJobSpec.second, new ArrayList<>());
-
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ activeEventHandler.registerListener(listener);
}
setupDistributedJob(entityId, procedureJobSpec.first, hcc,
listener, tempMdProvider.getResultSetId(), hdc,
stats);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 1a319a1..a8ec9aa 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ScheduledExecutorService;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.api.http.server.ResultUtil;
@@ -33,6 +34,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,21 +94,21 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
-
-
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(new Identifier(dataverseName));
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverse, procedureName);
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener)
activeEventHandler.getActiveEntityListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
- procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse,
procedureName,
- Integer.toString(getArity()));
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse,
procedureName, Integer.toString(getArity()));
if (procedure == null) {
throw new AlgebricksException("There is no procedure with this
name " + procedureName + ".");
}
@@ -118,8 +120,8 @@
if (listener.getType() == PrecompiledType.QUERY) {
hcc.waitForCompletion(hyracksJobId);
ResultReader resultReader = listener.getResultReader();
- ResultUtil.printResults(resultReader, ((QueryTranslator)
statementExecutor).getSessionConfig(),
- new Stats(), null);
+ ResultUtil.printResults(appCtx, resultReader,
+ ((QueryTranslator)
statementExecutor).getSessionConfig(), new Stats(), null);
}
} else {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index abdf90a..f7c3a74 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -19,6 +19,7 @@
package org.apache.asterix.bad.lang.statement;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -26,6 +27,7 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.struct.Identifier;
@@ -79,15 +81,17 @@
public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
+ ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
FunctionSignature signature = getFunctionSignature();
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new
Identifier(signature.getNamespace()));
signature.setNamespace(dataverse);
-
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverse, signature.getName());
- PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
- .getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener)
activeEventHandler.getActiveEntityListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -111,7 +115,7 @@
}
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+ activeEventHandler.removeListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index c5b7ef2..a246193 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -26,7 +26,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INcApplicationContext;
import
org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -51,8 +51,8 @@
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
- private final AOrderedListSerializerDeserializer subSerDes = new
AOrderedListSerializerDeserializer(
- new AOrderedListType(BuiltinType.AUUID, null));
+ private final AOrderedListSerializerDeserializer subSerDes =
+ new AOrderedListSerializerDeserializer(new
AOrderedListType(BuiltinType.AUUID, null));
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
@@ -70,7 +70,7 @@
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = subEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
- this.activeManager = (ActiveManager) ((IAppRuntimeContext)
ctx.getJobletContext().getServiceContext()
+ this.activeManager = (ActiveManager) ((INcApplicationContext)
ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1688
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Ian Maxon <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Steven Jacobs <[email protected]>