Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1868
Change subject: [ASTERIXDB-1973][BAD] Coodinated changes for IExtensionStatement
......................................................................
[ASTERIXDB-1973][BAD] Coodinated changes for IExtensionStatement
Change-Id: I392d7cda45e2bd39a85c037959ae5483eb48c9ee
---
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
10 files changed, 92 insertions(+), 98 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad
refs/changes/68/1868/1
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
index 5b81903..2050b17 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java
@@ -28,12 +28,9 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class BrokerDropStatement implements IExtensionStatement {
@@ -76,9 +73,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
//TODO: dont drop a broker that's being used
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 1b655da..dca4e89 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -35,12 +35,10 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -84,9 +82,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverse, channelName.getValue());
@@ -116,6 +113,7 @@
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
activeEventHandler.removeListener(listener);
+ final IHyracksClientConnection hcc =
requestContext.getHyracksClientConnection();
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 5bf0690..f3f18cc 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -48,6 +48,7 @@
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -123,10 +124,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
-
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
String brokerDataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(brokerDataverseName);
@@ -189,6 +188,10 @@
metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
+ final ResultDelivery resultDelivery =
requestContext.getResultDelivery();
+ final IHyracksClientConnection hcc =
requestContext.getHyracksClientConnection();
+ final IHyracksDataset hdc = requestContext.getHyracksDataset();
+ final Stats stats = requestContext.getStats();
if (subscriptionId == null) {
//To create a new subscription
VariableExpr resultVar = new VariableExpr(new
VarIdentifier("$result", 0));
@@ -196,7 +199,7 @@
useResultVar.setIsNewVar(false);
FieldAccessor accessor = new FieldAccessor(useResultVar, new
Identifier(BADConstants.SubscriptionId));
- metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter));
+ metadataProvider.setResultSetId(new ResultSetId(resultSetId));
boolean resultsAsync =
resultDelivery == ResultDelivery.ASYNC ||
resultDelivery == ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 60de69e..2e5ab4b 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -44,13 +44,10 @@
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ChannelUnsubscribeStatement implements IExtensionStatement {
@@ -106,9 +103,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
@@ -146,7 +142,8 @@
MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
- ((QueryTranslator)
statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
+ ((QueryTranslator) statementExecutor)
+ .handleDeleteStatement(tempMdProvider, delete,
requestContext.getHyracksClientConnection(), false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
index b4f3eae..f4b5402 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -31,12 +31,9 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class CreateBrokerStatement implements IExtensionStatement {
@@ -80,9 +77,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 09cc3e5..90bf45e 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -21,10 +21,7 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -65,11 +62,13 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -133,13 +132,11 @@
return channelResultsInsertQuery;
}
- @Override
- public byte getCategory() {
+ @Override public byte getCategory() {
return Category.DDL;
}
- @Override
- public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
+ @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
return null;
}
@@ -164,8 +161,7 @@
}
- @Override
- public byte getKind() {
+ @Override public byte getKind() {
return Kind.EXTENSION;
}
@@ -193,13 +189,14 @@
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true,
null, false);
- DatasetDecl createResultsDataset = new DatasetDecl(new
Identifier(dataverse), resultsName,
- new Identifier(BADConstants.BAD_DATAVERSE_NAME),
resultsTypeName, null, null, null, null,
- new HashMap<String, String>(), new HashMap<String, String>(),
DatasetType.INTERNAL, idd, true);
+ DatasetDecl createResultsDataset =
+ new DatasetDecl(new Identifier(dataverse), resultsName, new
Identifier(BADConstants.BAD_DATAVERSE_NAME),
+ resultsTypeName, null, null, null, null, new
HashMap<String, String>(),
+ new HashMap<String, String>(), DatasetType.INTERNAL,
idd, true);
//Run both statements to create datasets
- ((QueryTranslator)
statementExecutor).handleCreateDatasetStatement(metadataProvider,
createSubscriptionsDataset,
- hcc);
+ ((QueryTranslator) statementExecutor)
+ .handleCreateDatasetStatement(metadataProvider,
createSubscriptionsDataset, hcc);
metadataProvider.getLocks().reset();
((QueryTranslator)
statementExecutor).handleCreateDatasetStatement(metadataProvider,
createResultsDataset, hcc);
@@ -235,8 +232,9 @@
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
- return ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
fStatements.get(1),
- hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
+ return ((QueryTranslator) statementExecutor)
+ .handleInsertUpsertStatement(metadataProvider,
fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null,
+ stats, true, null, null);
}
private void setupExecutorJob(EntityId entityId, JobSpecification
channeljobSpec, IHyracksClientConnection hcc,
@@ -247,18 +245,17 @@
if (predistributed) {
jobId = hcc.distributeJob(channeljobSpec);
}
- ScheduledExecutorService ses =
ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
- jobId, hcc, ChannelJobService.findPeriod(duration));
+ ScheduledExecutorService ses = ChannelJobService
+ .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
jobId, hcc,
+ ChannelJobService.findPeriod(duration));
listener.storeDistributedInfo(jobId, ses, null);
}
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
-
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
//2. Create and run the Channel Job
@@ -309,13 +306,16 @@
MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
+ final IHyracksClientConnection hcc =
requestContext.getHyracksClientConnection();
+ final IHyracksDataset hdc = requestContext.getHyracksDataset();
+ final Stats stats = requestContext.getStats();
//Create Channel Datasets
- createDatasets(statementExecutor, subscriptionsName, resultsName,
tempMdProvider, hcc, hdc,
- dataverse);
+ createDatasets(statementExecutor, subscriptionsName, resultsName,
tempMdProvider, hcc, hdc, dataverse);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
- JobSpecification channeljobSpec =
createChannelJob(statementExecutor, subscriptionsName, resultsName,
- tempMdProvider, hcc, hdc, stats, dataverse);
+ JobSpecification channeljobSpec =
+ createChannelJob(statementExecutor, subscriptionsName,
resultsName, tempMdProvider, hcc, hdc, stats,
+ dataverse);
// Now we subscribe
if (listener == null) {
@@ -323,7 +323,8 @@
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx,
dataverse, subscriptionsName.getValue()));
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx,
dataverse, resultsName.getValue()));
//TODO: Add datasets used by channel function
- listener = new PrecompiledJobEventListener(entityId,
PrecompiledType.CHANNEL, datasets);
+ listener = new PrecompiledJobEventListener(appCtx, entityId,
PrecompiledType.CHANNEL, datasets, null,
+ "BadListener");
activeEventHandler.registerListener(listener);
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index dfc3ed3..75cd7d7 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -58,6 +58,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -96,8 +97,7 @@
this.period = (CallExpr) period;
}
- @Override
- public byte getKind() {
+ @Override public byte getKind() {
return Kind.EXTENSION;
}
@@ -109,8 +109,7 @@
return signature;
}
- @Override
- public byte getCategory() {
+ @Override public byte getCategory() {
return Category.DDL;
}
@@ -118,8 +117,7 @@
return period;
}
- @Override
- public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
+ @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
return null;
}
@@ -170,10 +168,9 @@
throw new CompilationException("Procedure can only execute a
single statement");
}
if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
- return new Pair<>(
- ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
- fStatements.get(0), hcc, hdc,
ResultDelivery.ASYNC, null, stats, true, null, null),
- PrecompiledType.INSERT);
+ return new Pair<>(((QueryTranslator) statementExecutor)
+ .handleInsertUpsertStatement(metadataProvider,
fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC,
+ null, stats, true, null, null),
PrecompiledType.INSERT);
} else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
Pair<JobSpecification, PrecompiledType> pair =
new Pair<>(compileQueryJob(statementExecutor,
metadataProvider, hcc, (Query) fStatements.get(0)),
@@ -183,8 +180,8 @@
} else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
SqlppDeleteRewriteVisitor visitor = new
SqlppDeleteRewriteVisitor();
fStatements.get(0).accept(visitor, null);
- return new Pair<>(((QueryTranslator)
statementExecutor).handleDeleteStatement(metadataProvider,
- fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+ return new Pair<>(((QueryTranslator) statementExecutor)
+ .handleDeleteStatement(metadataProvider,
fStatements.get(0), hcc, true), PrecompiledType.DELETE);
} else {
throw new CompilationException("Procedure can only execute a
single delete, insert, or query");
}
@@ -198,9 +195,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
@@ -217,8 +213,8 @@
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse,
signature.getName(),
- Integer.toString(signature.getArity()));
+ procedure = BADLangExtension
+ .getProcedure(mdTxnCtx, dataverse, signature.getName(),
Integer.toString(signature.getArity()));
if (procedure != null) {
throw new AlgebricksException("A procedure with this name " +
signature.getName() + " already exists.");
}
@@ -236,7 +232,11 @@
metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
- metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
+ metadataProvider.setResultSetId(new ResultSetId(resultSetId++));
+ final ResultDelivery resultDelivery =
requestContext.getResultDelivery();
+ final IHyracksClientConnection hcc =
requestContext.getHyracksClientConnection();
+ final IHyracksDataset hdc = requestContext.getHyracksDataset();
+ final Stats stats = requestContext.getStats();
boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC ||
resultDelivery == ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
@@ -252,7 +252,8 @@
// Now we subscribe
if (listener == null) {
//TODO: Add datasets used by channel function
- listener = new PrecompiledJobEventListener(entityId,
procedureJobSpec.second, new ArrayList<>());
+ listener = new PrecompiledJobEventListener(appCtx, entityId,
procedureJobSpec.second, new ArrayList<>(),
+ null, "BadListener");
activeEventHandler.registerListener(listener);
}
setupDistributedJob(entityId, procedureJobSpec.first, hcc,
listener, tempMdProvider.getResultSetId(), hdc,
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 9bf3718..6eaf531 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -41,12 +41,11 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
@@ -91,9 +90,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
@@ -114,6 +112,7 @@
}
JobId hyracksJobId = listener.getJobId();
+ final IHyracksClientConnection hcc =
requestContext.getHyracksClientConnection();
if (procedure.getDuration().equals("")) {
hcc.startJob(hyracksJobId);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index f7c3a74..e230e2f 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -35,12 +35,9 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -78,9 +75,8 @@
}
@Override
- public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ public void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, int resultSetId) throws
HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
@@ -117,7 +113,7 @@
listener.deActivate();
activeEventHandler.removeListener(listener);
if (hyracksJobId != null) {
- hcc.destroyJob(hyracksJobId);
+
requestContext.getHyracksClientConnection().destroyJob(hyracksJobId);
}
//Remove the Channel Metadata
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 55547ea..5eb18d1 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -26,8 +26,10 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEventSubscriber;
import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.log4j.Logger;
@@ -47,9 +49,9 @@
private final PrecompiledType type;
- public PrecompiledJobEventListener(EntityId entityId, PrecompiledType
type, List<IDataset> datasets) {
- this.entityId = entityId;
- this.datasets = datasets;
+ public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId
entityId, PrecompiledType type,
+ List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint
locations, String runtimeName) {
+ super(appCtx, entityId, datasets, locations, runtimeName);
state = ActivityState.STOPPED;
this.type = type;
}
@@ -99,6 +101,11 @@
}
}
+ @Override
+ public void refreshStats(long l) throws HyracksDataException {
+ // no op
+ }
+
private synchronized void handleJobStartEvent(ActiveEvent message) throws
Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job started for " + entityId);
@@ -113,7 +120,7 @@
}
@Override
- public IActiveEventSubscriber subscribe(ActivityState state) throws
HyracksDataException {
- return null;
+ public synchronized void subscribe(IActiveEventSubscriber subscriber)
throws HyracksDataException {
+ // no op
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1868
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I392d7cda45e2bd39a85c037959ae5483eb48c9ee
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>