Till Westmann has submitted this change and it was merged.
Change subject: [ASTERIXDB-2314][HYR] Dataset in class names in Hyracks
......................................................................
[ASTERIXDB-2314][HYR] Dataset in class names in Hyracks
Change-Id: I333fa410df5efe7da9d4f0e9b7143f9f6928b88b
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
6 files changed, 25 insertions(+), 24 deletions(-)
Approvals:
Jenkins: Verified
Murtadha Hubail: Looks good to me, approved
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index c48ec54..5ec4852 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -51,11 +51,11 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultSetId;
/**
* Provides functionality for channel jobs
@@ -122,7 +122,7 @@
}
public static long runDeployedJobSpec(DeployedJobSpecId distributedId,
IHyracksClientConnection hcc,
- IHyracksDataset hdc, Map<byte[], byte[]> jobParameters, EntityId
entityId, ITxnIdFactory txnIdFactory,
+ IResultSet resultSet, Map<byte[], byte[]> jobParameters, EntityId
entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener
listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);
@@ -138,7 +138,7 @@
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
if (listener.getType() ==
DeployedJobSpecEventListener.PrecompiledType.QUERY) {
- ResultReader resultReader = new ResultReader(hdc, jobId, new
ResultSetId(0));
+ ResultReader resultReader = new ResultReader(resultSet, jobId, new
ResultSetId(0));
ResultUtil.printResults(appCtx, resultReader,
statementExecutor.getSessionOutput(),
new IStatementExecutor.Stats(), null);
@@ -235,7 +235,7 @@
//Procedures
metadataProvider.setResultSetId(new ResultSetId(0));
IStatementExecutor.ResultDelivery resultDelivery =
requestParameters.getResultProperties().getDelivery();
- IHyracksDataset hdc = requestParameters.getHyracksDataset();
+ IResultSet hdc = requestParameters.getResultSet();
IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery ==
IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery ==
IStatementExecutor.ResultDelivery.DEFERRED;
@@ -272,12 +272,12 @@
}
private static JobSpecification compileProcedureJob(IStatementExecutor
statementExecutor,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IResultSet resultSet,
IStatementExecutor.Stats stats, Statement procedureStatement)
throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
- procedureStatement, hcc, hdc,
IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
- null, null);
+ procedureStatement, hcc, resultSet,
IStatementExecutor.ResultDelivery.ASYNC, null, stats, true,
+ null, null, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc,
(Query) procedureStatement);
} else {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index cff1eaa..f6f17f3 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -56,9 +56,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.ResultSetId;
public class ChannelSubscribeStatement extends ExtensionStatement {
@@ -184,7 +184,7 @@
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
final ResultDelivery resultDelivery =
requestParameters.getResultProperties().getDelivery();
- final IHyracksDataset hdc = requestParameters.getHyracksDataset();
+ final IResultSet resultSet = requestParameters.getResultSet();
final Stats stats = requestParameters.getStats();
if (subscriptionId == null) {
//To create a new subscription
@@ -207,14 +207,14 @@
InsertStatement insert = new InsertStatement(new
Identifier(dataverse),
new Identifier(subscriptionsDatasetName),
subscriptionTuple, varCounter, resultVar, accessor);
- ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
- resultDelivery, null, stats, false, null, null, null);
+ ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc,
+ resultSet, resultDelivery, null, stats, false, null,
null, null);
} else {
//To update an existing subscription
UpsertStatement upsert = new UpsertStatement(new
Identifier(dataverse),
new Identifier(subscriptionsDatasetName),
subscriptionTuple, varCounter, null, null);
- ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
- resultDelivery, null, stats, false, null, null, null);
+ ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc,
+ resultSet, resultDelivery, null, stats, false, null,
null, null);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index edadeb6..0ddb1c3 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -72,9 +72,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
public class CreateChannelStatement extends ExtensionStatement {
@@ -215,7 +215,7 @@
}
private JobSpecification createChannelJob(IStatementExecutor
statementExecutor, MetadataProvider metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
throws Exception {
+ IHyracksClientConnection hcc, IResultSet resultSet, Stats stats)
throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("SET inline_with \"false\";\n");
if (!push) {
@@ -253,7 +253,7 @@
(Query) fStatements.get(1));
}
return ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
fStatements.get(1),
- hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null,
null);
+ hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null,
null, null);
}
@Override
@@ -306,13 +306,14 @@
MetadataProvider tempMdProvider = new
MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
- final IHyracksDataset hdc = requestContext.getHyracksDataset();
+ final IResultSet resultSet = requestContext.getResultSet();
final Stats stats = requestContext.getStats();
//Create Channel Datasets
createDatasets(statementExecutor, tempMdProvider, hcc);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
- JobSpecification channeljobSpec =
createChannelJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
+ JobSpecification channeljobSpec =
+ createChannelJob(statementExecutor, tempMdProvider, hcc,
resultSet, stats);
// Now we subscribe
if (listener == null) {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 6459b4c..ce8f1d2 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -73,10 +73,10 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
public class CreateProcedureStatement extends ExtensionStatement {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 69f413e..2c9d361 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -119,7 +119,7 @@
Map<byte[], byte[]> contextRuntimeVarMap =
createParameterMap(procedure);
DeployedJobSpecId deployedJobSpecId =
listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
- BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc,
requestParameters.getHyracksDataset(),
+ BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc,
requestParameters.getResultSet(),
contextRuntimeVarMap, entityId,
metadataProvider.getTxnIdFactory(), appCtx, listener,
(QueryTranslator) statementExecutor);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index f358986..d2a6613 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -56,7 +56,7 @@
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
public class BADGlobalRecoveryManager extends GlobalRecoveryManager {
@@ -146,7 +146,7 @@
activeEventHandler.registerListener(listener);
BADJobService.redeployJobSpec(entityId, procedure.getBody(),
metadataProvider, badStatementExecutor, hcc,
new RequestParameters(
- new HyracksDataset(hcc,
appCtx.getCompilerProperties().getFrameSize(),
+ new ResultSet(hcc,
appCtx.getCompilerProperties().getFrameSize(),
ResultReader.NUM_READERS),
new
ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null,
null, true),
--
To view, visit https://asterix-gerrit.ics.uci.edu/2586
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I333fa410df5efe7da9d4f0e9b7143f9f6928b88b
Gerrit-PatchSet: 7
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>