>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286 )
Change subject: [ASTERIXDB-3343][API] Refactor and track other requests ...................................................................... [ASTERIXDB-3343][API] Refactor and track other requests - user model changes: no - storage format changes: no - interface changes: no Details: Refactor code and track other requests: INSERT/UPSERT/DELETE/COPY TO/COPY FROM and queries. Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java 1 file changed, 44 insertions(+), 20 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve Jenkins: Verified; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 2ef3657..340cd57 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4007,11 +4007,10 @@ spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); } - jobId = JobUtils.runJob(hcc, spec, jobFlags, false); + String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); - final ClientRequest clientRequest = - (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); - clientRequest.setJobId(jobId); + final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); + jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4152,8 +4151,9 @@ throw e; } }; + String reqId = reqParams.getRequestReference().getUuid(); IRequestTracker requestTracker = appCtx.getRequestTracker(); - ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid()); + ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, reqParams, true, stmt, clientRequest); @@ -4179,8 +4179,7 @@ jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); } - jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); - clientRequest.setJobId(jobId); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4248,11 +4247,11 @@ jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); } - jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); + String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); - final ClientRequest clientRequest = - (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); - clientRequest.setJobId(jobId); + final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), + clientRequest); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4280,6 +4279,15 @@ } } + private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, + String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception { + jobSpec.setRequestId(reqId); + JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); + LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId); + clientRequest.setJobId(jobId); + return jobId; + } + @Override public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt, @@ -5393,9 +5401,9 @@ IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx, MetadataProvider metadataProvider, Statement atomicStatement) throws Exception { + String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); - final ClientRequest clientRequest = - (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); + final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); if (cancellable) { clientRequest.markCancellable(); } @@ -5412,7 +5420,6 @@ appCtx.getReceptionist().ensureSchedulable(schedulableRequest); // ensure request not cancelled before running job ensureNotCancelled(clientRequest); - jobSpec.setRequestId(clientRequest.getId()); if (atomicStatement != null) { Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(), ((InsertStatement) atomicStatement).getDataverseName(), @@ -5430,12 +5437,7 @@ participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); } } - jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId, - requestParameters.getRequestReference().getUuid(), requestParameters.getClientContextId()); - } - clientRequest.setJobId(jobId); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest); if (jId != null) { jId.setValue(jobId); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98 Gerrit-Change-Number: 18286 Gerrit-PatchSet: 2 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
