>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286 )
Change subject: [ASTERIXDB-3343][API] Refactor and track other requests
......................................................................
[ASTERIXDB-3343][API] Refactor and track other requests
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Refactor code and track other requests:
INSERT/UPSERT/DELETE/COPY TO/COPY FROM and queries.
Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
1 file changed, 39 insertions(+), 20 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/86/18286/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2ef3657..340cd57 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4007,11 +4007,10 @@
spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME,
new GlobalTxInfo(participatingDatasetIds,
numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+ String reqId =
requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker =
appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
- clientRequest.setJobId(jobId);
+ final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
+ jobId = runTrackJob(hcc, spec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4152,8 +4151,9 @@
throw e;
}
};
+ String reqId = reqParams.getRequestReference().getUuid();
IRequestTracker requestTracker = appCtx.getRequestTracker();
- ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqParams.getRequestReference().getUuid());
+ ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker,
resultDelivery, outMetadata, stats,
reqParams, true, stmt, clientRequest);
@@ -4179,8 +4179,7 @@
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- clientRequest.setJobId(jobId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
reqParams.getClientContextId(), clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4248,11 +4247,11 @@
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ String reqId =
requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker =
appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
- clientRequest.setJobId(jobId);
+ final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(),
+ clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4280,6 +4279,15 @@
}
}
+ private static JobId runTrackJob(IHyracksClientConnection hcc,
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
+ String reqId, String clientCtxId, ClientRequest clientRequest)
throws Exception {
+ jobSpec.setRequestId(reqId);
+ JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId,
reqId, clientCtxId);
+ clientRequest.setJobId(jobId);
+ return jobId;
+ }
+
@Override
public JobSpecification rewriteCompileQuery(IClusterInfoCollector
clusterInfoCollector,
MetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt,
@@ -5393,9 +5401,9 @@
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IResultPrinter printer,
IRequestParameters requestParameters, boolean cancellable,
ICcApplicationContext appCtx,
MetadataProvider metadataProvider, Statement atomicStatement)
throws Exception {
+ String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
+ final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
if (cancellable) {
clientRequest.markCancellable();
}
@@ -5412,7 +5420,6 @@
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
- jobSpec.setRequestId(clientRequest.getId());
if (atomicStatement != null) {
Dataset ds = metadataProvider.findDataset(((InsertStatement)
atomicStatement).getDatabaseName(),
((InsertStatement) atomicStatement).getDataverseName(),
@@ -5430,12 +5437,7 @@
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Created job {} for query uuid:{},
clientContextID:{}", jobId,
- requestParameters.getRequestReference().getUuid(),
requestParameters.getClientContextId());
- }
- clientRequest.setJobId(jobId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest);
if (jId != null) {
jId.setValue(jobId);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98
Gerrit-Change-Number: 18286
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange