abdullah alamoudi has submitted this change and it was merged.
Change subject: Pass executor context and client id to extention statements
......................................................................
Pass executor context and client id to extention statements
Change-Id: I1395817df00917bd9b4d5676d4f2cea3888b7221
---
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
10 files changed, 42 insertions(+), 43 deletions(-)
Approvals:
abdullah alamoudi: Looks good to me, approved
Jenkins: Verified
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index b95ca11..1a5fc34 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -250,7 +250,6 @@
metadataProvider.getLocks().unlock();
}
-
}
@Override
@@ -343,7 +342,7 @@
tempMdProvider.getLocks().reset();
ChannelDropStatement drop =
new ChannelDropStatement(dvId, new
Identifier(channel.getChannelId().getEntityName()), false);
- drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
+ drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null);
}
for (Procedure procedure : procedures) {
if
(!procedure.getEntityId().getDataverse().equals(dvId.getValue())) {
@@ -352,13 +351,13 @@
tempMdProvider.getLocks().reset();
ProcedureDropStatement drop = new ProcedureDropStatement(new
FunctionSignature(dvId.getValue(),
procedure.getEntityId().getEntityName(),
procedure.getArity()), false);
- drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
+ drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null);
}
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx,
dvId.getValue());
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new
Identifier(broker.getBrokerName()), false);
- drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
+ drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 4f68f40..b2b38ab 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -30,6 +30,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -70,8 +71,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
//TODO: dont drop a broker that's being used
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 8403fcc..d778215 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -40,6 +40,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -82,8 +83,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverse, channelName.getValue());
@@ -111,9 +112,8 @@
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
- "Tried to drop a Deployed Job whose listener no
longer exists: "
- + entityId.getExtensionName() + " " +
entityId.getDataverse() + "."
- + entityId.getEntityName() + ".");
+ "Tried to drop a Deployed Job whose listener no
longer exists: " + entityId.getExtensionName()
+ + " " + entityId.getDataverse() + "." +
entityId.getEntityName() + ".");
} else {
listener.getExecutorService().shutdown();
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7583f0b..cff1eaa 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -52,6 +52,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -120,8 +121,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
String brokerDataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(brokerDataverseName);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index b23bf3b..774ca86 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -46,6 +46,7 @@
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -100,8 +101,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
@@ -139,8 +140,7 @@
MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
- ((QueryTranslator)
statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false,
null,
- null);
+ ((QueryTranslator)
statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false,
null, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index 367599b..0f01a66 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -33,6 +33,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -74,8 +75,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a28666a..edadeb6 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -68,6 +68,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -138,8 +139,7 @@
return null;
}
- public void initialize(MetadataTransactionContext mdTxnCtx)
- throws AlgebricksException, HyracksDataException {
+ public void initialize(MetadataTransactionContext mdTxnCtx) throws
AlgebricksException, HyracksDataException {
Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx,
function);
if (lookup == null) {
throw new MetadataException(" Unknown function " +
function.getName());
@@ -258,8 +258,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestContext, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestContext, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
//2. Create and run the Channel Job
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index aaf2bfa..6459b4c 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -69,6 +69,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -175,8 +176,7 @@
}
private Pair<JobSpecification, PrecompiledType>
createProcedureJob(IStatementExecutor statementExecutor,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc,
Stats stats)
- throws Exception {
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc,
Stats stats) throws Exception {
if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
if (!varList.isEmpty()) {
throw new CompilationException("Insert procedures cannot have
parameters");
@@ -193,8 +193,7 @@
dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
((Query) getProcedureBodyStatement()).getBody(),
metadataProvider).get(1));
Pair<JobSpecification, PrecompiledType> pair = new
Pair<>(BADJobService.compileQueryJob(statementExecutor,
- metadataProvider, hcc, (Query)
getProcedureBodyStatement()),
- PrecompiledType.QUERY);
+ metadataProvider, hcc, (Query)
getProcedureBodyStatement()), PrecompiledType.QUERY);
dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
((Query) getProcedureBodyStatement()).getBody(),
metadataProvider).get(0));
return pair;
@@ -216,8 +215,7 @@
}
private void setupDeployedJobSpec(EntityId entityId, JobSpecification
jobSpec, IHyracksClientConnection hcc,
- DeployedJobSpecEventListener listener, ResultSetId resultSetId,
Stats stats)
- throws Exception {
+ DeployedJobSpecEventListener listener, ResultSetId resultSetId,
Stats stats) throws Exception {
jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
entityId);
DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
listener.setDeployedJobSpecId(deployedJobSpecId);
@@ -225,8 +223,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index b794538..69f413e 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -50,6 +50,7 @@
import org.apache.asterix.translator.ConstantHelper;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -96,8 +97,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
@@ -119,9 +120,8 @@
DeployedJobSpecId deployedJobSpecId =
listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc,
requestParameters.getHyracksDataset(),
- contextRuntimeVarMap, entityId,
- metadataProvider.getTxnIdFactory(), appCtx, listener,
(QueryTranslator) statementExecutor);
-
+ contextRuntimeVarMap, entityId,
metadataProvider.getTxnIdFactory(), appCtx, listener,
+ (QueryTranslator) statementExecutor);
} else {
ScheduledExecutorService ses =
BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
@@ -142,8 +142,7 @@
}
}
- private Map<byte[], byte[]> createParameterMap(Procedure procedure)
- throws AsterixException, HyracksDataException {
+ private Map<byte[], byte[]> createParameterMap(Procedure procedure) throws
AsterixException, HyracksDataException {
Map<byte[], byte[]> map = new HashMap<>();
if (procedure.getParams().size() != argList.size()) {
throw
AsterixException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 95531bd..7a1cfd0 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -40,6 +40,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -76,8 +77,8 @@
@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor
statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException {
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws
HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
@@ -115,9 +116,8 @@
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
- "Tried to drop a Deployed Job whose listener no
longer exists: "
- + entityId.getExtensionName() + " " +
entityId.getDataverse() + "."
- + entityId.getEntityName() + ".");
+ "Tried to drop a Deployed Job whose listener no
longer exists: " + entityId.getExtensionName()
+ + " " + entityId.getDataverse() + "." +
entityId.getEntityName() + ".");
} else {
if (listener.getExecutorService() != null) {
listener.getExecutorService().shutdown();
--
To view, visit https://asterix-gerrit.ics.uci.edu/2775
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I1395817df00917bd9b4d5676d4f2cea3888b7221
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>