>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271?usp=email )
Change subject: [ASTERIXDB-3649][API] Untrack non-ASYNC requests ...................................................................... [ASTERIXDB-3649][API] Untrack non-ASYNC requests - user model changes: no - storage format changes: no - interface changes: yes Details: For a non-ASYNC request that has a statement not producing a result, remove the tracking entry when the statement execution is done. For all other statements that failed, sweep the result directory (remove job record entry + tracking entry) - when a request is archived, nullify the thread executing the request. - on joblet clean-up in NCs, remove partial results if the job has failed. Ext-ref: MB-71997 Change-Id: I48f88ef268447f1bced0006087b21dc8bb1ff4c1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java 6 files changed, 87 insertions(+), 18 deletions(-) Approvals: Ali Alsuliman: Looks good to me, but someone else must approve Ian Maxon: Looks good to me, approved Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index 72c2ebc..6a533b3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -44,7 +44,7 @@ StorageUtil.getIntSizeInBytes(64, StorageUtil.StorageUnit.KILOBYTE); protected final long creationTime = System.nanoTime(); protected final long creationSystemTime = System.currentTimeMillis(); - protected final Thread executor; + protected Thread executor; protected final String statement; protected final String clientContextId; protected final JobState jobState; @@ -66,6 +66,11 @@ return clientContextId; } + @Override + public void archived() { + executor = null; + } + public void setPlan(String plan) { if (plan != null) { this.plan = plan.length() > MAX_STATEMENT_LENGTH ? plan.substring(0, MAX_STATEMENT_LENGTH) : plan; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 8aa308f..b7e195b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -372,12 +372,13 @@ @Override public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { validateStatements(requestParameters); - trackRequest(requestParameters); + boolean trackInAsyncDeferredRequests = shouldTrackAsSeparateRequest(requestParameters); + trackRequest(requestParameters, trackInAsyncDeferredRequests); Counter resultSetIdCounter = new Counter(0); FileSplit outputFile = null; String threadName = Thread.currentThread().getName(); - Thread.currentThread().setName( - QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid()); + String reqId = requestParameters.getRequestReference().getUuid(); + Thread.currentThread().setName(QueryTranslator.class.getSimpleName() + ":" + reqId); Map<String, String> config = new HashMap<>(); final IResultSet resultSet = requestParameters.getResultSet(); final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); @@ -387,6 +388,8 @@ final ResultMetadata outMetadata = requestParameters.getOutMetadata(); final Map<String, IAObject> stmtParams = requestParameters.getStatementParameters(); warningCollector.setMaxWarnings(sessionConfig.getMaxWarnings()); + boolean hasResultSet = false; + Exception exception = null; try { for (Statement stmt : statements) { if (sessionConfig.is(SessionConfig.FORMAT_HTML)) { @@ -598,19 +601,55 @@ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(), "Unexpected statement: " + kind); } + hasResultSet |= metadataProvider.getResultSetId() != null; } } catch (Exception ex) { + // need to handle job and request clean-ups in case of failures this.appCtx.getRequestTracker().incrementFailedRequests(); + exception = ex; throw ex; } finally { // async queries are completed after their job completes if (statements.isEmpty() || ResultDelivery.ASYNC != resultDelivery) { - appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid()); + // this assumes 1-1 mapping between a request and a statement, needs to be adapted for multi-statement + appCtx.getRequestTracker().complete(reqId); + if (ResultDelivery.ASYNC != resultDelivery) { + completeDeferred(reqId, hasResultSet, exception); + } } Thread.currentThread().setName(threadName); } } + private void completeDeferred(String reqId, boolean hasResultSet, Exception exception) { + try { + Optional<IClientRequest> optClientReq = appCtx.getRequestTracker().getAsyncOrDeferredRequest(reqId); + if (optClientReq.isPresent()) { + ClientRequest clientRequest = (ClientRequest) optClientReq.get(); + JobId jobId = clientRequest.getJobId(); + if (jobId == null) { + // jobId = null either means: + // 1. compile error, compile-only, ... resulting in no job created whether query, DML or DDL + // 2. statement currently not setting jobId in the client request, e.g. DDLs + // for 2. it needs to be handled so that the job record is removed also if not producing a result + appCtx.getRequestTracker().removeAsyncOrDeferredRequest(reqId); + } else if (!hasResultSet) { + // don't sweep ones that completed successfully and produced a result + // sweeps statements not producing a result, e.g. DMLs without a return clause + // sweeps statements that threw an exception whether query, DML or DDL + ClusterControllerService ccSvs = + (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + ccSvs.getResultDirectoryService().sweep(jobId); + } + } + } catch (Throwable th) { + if (exception != null) { + exception.addSuppressed(th); + } + LOGGER.log(Level.WARN, "Failed to clean up deferred request", th); + } + } + private long getMaxResultReads(ResultDelivery mode, long maxResultReads) { // !sessionConfig.isIncludeHost() typically means the request is via the new request API return mode == ResultDelivery.ASYNC && !sessionConfig.isIncludeHost() ? UNLIMITED_READS : maxResultReads; @@ -6007,10 +6046,11 @@ } } - protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException { + protected void trackRequest(IRequestParameters requestParameters, boolean trackInAsyncDeferredRequests) + throws HyracksDataException { final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters); this.appCtx.getRequestTracker().track(clientRequest); - if (shouldTrackAsSeparateRequest(requestParameters)) { + if (trackInAsyncDeferredRequests) { appCtx.getRequestTracker().trackAsyncOrDeferredRequest(clientRequest); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 2d239c1..496b2a1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -101,6 +101,12 @@ boolean cancel(ICcApplicationContext appCtx) throws HyracksDataException; /** + * Called when the request is archived. + * The request is archived when it is completed or cancelled and removed from the list of running requests. + */ + void archived(); + + /** * @return A json string representation of this request */ String toJson(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index 62716be..3b1f1bd 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -154,6 +154,7 @@ clientIdRequests.remove(completedRequest.getClientContextId()); } archive(completedRequest); + completedRequest.archived(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index dfd10a0..9217d82 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -232,17 +232,23 @@ @Override public void sweep(JobId jobId) { - JobResultInfo removedJob; - synchronized (this) { - removedJob = jobResultLocations.remove(jobId); - } + JobResultInfo removedJob = sweepJob(jobId); if (removedJob != null) { - ResultJobRecord rec = removedJob.getRecord(); - try { - jobResultCallback.notifyResultSweep(jobId, rec); - } catch (Throwable t) { - LOGGER.warn("failed to notify result sweep for job {}, req {}", jobId, rec.getRequestId(), t); - } + notifyResultSweep(jobId, removedJob); + } + } + + private synchronized JobResultInfo sweepJob(JobId jobId) { + return jobResultLocations.remove(jobId); + } + + private void notifyResultSweep(JobId jobId, JobResultInfo removedJob) { + // ResultJobRecord should never be null + ResultJobRecord rec = removedJob.getRecord(); + try { + jobResultCallback.notifyResultSweep(jobId, rec); + } catch (Throwable t) { + LOGGER.warn("failed to notify result sweep for job {}, req {}", jobId, rec.getRequestId(), t); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 3b4e54b..2e8ad27 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -54,6 +54,7 @@ import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.api.resources.IDeallocatable; import org.apache.hyracks.api.resources.memory.IFrameProfiler; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.job.PartitionRequest; import org.apache.hyracks.control.common.job.PartitionState; @@ -269,7 +270,17 @@ LOGGER.trace(() -> "Freeing leaked " + stillAllocated + " bytes"); serviceCtx.getMemoryManager().deallocate(stillAllocated); } - nodeController.getExecutor().execute(() -> deallocatableRegistry.close()); + nodeController.getExecutor().execute(() -> { + try { + InvokeUtil.tryWithCleanups(deallocatableRegistry::close, () -> { + if (cleanupStatus != JobStatus.TERMINATED) { + nodeController.getResultPartitionManager().sweep(jobId); + } + }); + } catch (Exception e) { + LOGGER.warn("Failure during joblet clean-up", e); + } + }); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: I48f88ef268447f1bced0006087b21dc8bb1ff4c1 Gerrit-Change-Number: 21271 Gerrit-PatchSet: 5 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail Gerrit-Reviewer: Murtadha Hubail <[email protected]>
