>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269?usp=email )
Change subject: [ASTERIXDB-3649][API] Set max result reads + untrack request ...................................................................... [ASTERIXDB-3649][API] Set max result reads + untrack request - user model changes: no - storage format changes: no - interface changes: yes Details: For non-ASYNC requests: - set max result reads as specified in the request instead of UNLIMITED. - remove requests whose result is exhausted from the asyncDeferred tracking map. This applies only to statements that have produced a result. Statements not producing a result need to be handled differently. Ext-ref: MB-71997 Change-Id: I8b2182582fce9ac588d2259f5dfd282183404c75 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java 10 files changed, 93 insertions(+), 11 deletions(-) Approvals: Ali Alsuliman: Looks good to me, but someone else must approve Jenkins: Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java index 5c89b2e..e5a4034 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java @@ -59,6 +59,14 @@ } } + @Override + public void notifyResultSweep(JobId jobId, ResultJobRecord resultJobRecord) { + String requestId = resultJobRecord.getRequestId(); + if (requestId != null) { + appCtx.getRequestTracker().notifyResultSweep(jobId, requestId); + } + } + private void updateResultMetadata(JobId jobId, ResultJobRecord resultJobRecord) { final ResultSetMetaData resultSetMetaData = resultJobRecord.getResultSetMetaData(); if (resultSetMetaData == null) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index a9320f2..86b71d4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -562,8 +562,7 @@ metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter.getAndInc())); metadataProvider.setResultAsyncMode( resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); - metadataProvider - .setMaxResultReads(!sessionConfig.isIncludeHost() ? UNLIMITED_READS : maxResultReads); + metadataProvider.setMaxResultReads(getMaxResultReads(resultDelivery, maxResultReads)); if (stats.getProfileType() == Stats.ProfileType.FULL) { this.jobFlags.add(JobFlag.PROFILE_RUNTIME); } @@ -611,6 +610,11 @@ } } + private long getMaxResultReads(ResultDelivery mode, long maxResultReads) { + // !sessionConfig.isIncludeHost() typically means the request is via the new request API + return mode == ResultDelivery.ASYNC && !sessionConfig.isIncludeHost() ? UNLIMITED_READS : maxResultReads; + } + protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config, Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters, Statement statement) { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http new file mode 100644 index 0000000..3d681d2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// statuscode 404 +$result diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java index 8f3ff6d..b5592fe 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IJobLifecycleListener; +import org.apache.hyracks.api.job.JobId; public interface IRequestTracker extends IJobLifecycleListener { @@ -116,4 +117,12 @@ */ Optional<IClientRequest> getAsyncOrDeferredRequest(String requestId); + /** + * Notifies this tracker that the result of job {@code jobId} has been consumed. + * + * @param jobId the job id whose result has been consumed + * @param requestId the request id associated with the job + */ + void notifyResultSweep(JobId jobId, String requestId); + } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index 1dbb6b2..f596e07 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -101,6 +101,11 @@ } @Override + public void notifyResultSweep(JobId jobId, String requestId) { + removeAsyncOrDeferredRequest(requestId); + } + + @Override public boolean cancel(String requestId) throws HyracksDataException { final IClientRequest request = runningRequests.get(requestId); if (request == null) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java index fe7eb36..546522c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java @@ -22,6 +22,18 @@ public interface IJobResultCallback { + IJobResultCallback INSTANCE_NO_OP = new IJobResultCallback() { + @Override + public void completed(JobId jobId, ResultJobRecord resultJobRecord) { + + } + + @Override + public void notifyResultSweep(JobId jobId, ResultJobRecord resultJobRecord) { + + } + }; + /** * Notifies this callback that writing the result of job {@code jobId} has been completed successfully. * @@ -29,4 +41,12 @@ * @param resultJobRecord */ void completed(JobId jobId, ResultJobRecord resultJobRecord); + + /** + * Notifies this callback that the result of job {@code jobId} has been swept. + * + * @param jobId the job id whose result has been swept + * @param resultJobRecord the result record of the job + */ + void notifyResultSweep(JobId jobId, ResultJobRecord resultJobRecord); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java index 4308d64..0c57931 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java @@ -83,15 +83,17 @@ private static final Logger LOGGER = LogManager.getLogger(); private final long timestamp; private final long resultTtlInNanos; // per-request TTL in nanoseconds, -1 for system default + private final String requestId; private long jobStartTime; private long jobEndTime; - private Status status; + private final Status status; private ResultSetId rsId; private ResultSetMetaData resultSetMetaData; private long resultCount; - private boolean resultSetOrdered; + private final boolean resultSetOrdered; - public ResultJobRecord(boolean resultSetOrdered, long resultTtlInNanos) { + public ResultJobRecord(boolean resultSetOrdered, long resultTtlInNanos, String requestId) { + this.requestId = requestId; this.timestamp = System.nanoTime(); this.resultTtlInNanos = resultTtlInNanos; this.status = new Status(); @@ -161,6 +163,10 @@ return resultTtlInNanos; } + public String getRequestId() { + return requestId; + } + public Status getStatus() { return status; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java index 22e4bba..b5e90eb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java @@ -123,9 +123,7 @@ @Override public IJobResultCallback getJobResultCallback() { - return (jobId, resultJobRecord) -> { - // no op - }; + return IJobResultCallback.INSTANCE_NO_OP; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index 93ee281..dfd10a0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -92,7 +92,8 @@ if (resultTtl == null) { resultTtl = -1L; } - jobResultLocations.put(jobId, new JobResultInfo(new ResultJobRecord(partitionsOrdered, resultTtl), null)); + jobResultLocations.put(jobId, + new JobResultInfo(new ResultJobRecord(partitionsOrdered, resultTtl, spec.getRequestId()), null)); } @Override @@ -230,8 +231,19 @@ } @Override - public synchronized void sweep(JobId jobId) { - jobResultLocations.remove(jobId); + public void sweep(JobId jobId) { + JobResultInfo removedJob; + synchronized (this) { + removedJob = jobResultLocations.remove(jobId); + } + if (removedJob != null) { + ResultJobRecord rec = removedJob.getRecord(); + try { + jobResultCallback.notifyResultSweep(jobId, rec); + } catch (Throwable t) { + LOGGER.warn("failed to notify result sweep for job {}, req {}", jobId, rec.getRequestId(), t); + } + } } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: I8b2182582fce9ac588d2259f5dfd282183404c75 Gerrit-Change-Number: 21269 Gerrit-PatchSet: 3 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail Gerrit-Reviewer: Murtadha Hubail <[email protected]>
