>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269?usp=email )

Change subject: [ASTERIXDB-3649][API] Set max result reads + untrack request
......................................................................

[ASTERIXDB-3649][API] Set max result reads + untrack request

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
For non-ASYNC requests:
- set max result reads as specified in the request instead of UNLIMITED.
- remove requests whose result is exhausted from the asyncDeferred
  tracking map.

This applies only to statements that have produced a result.
Statements not producing a result need to be handled differently.

Ext-ref: MB-71997
Change-Id: I8b2182582fce9ac588d2259f5dfd282183404c75
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
10 files changed, 93 insertions(+), 11 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, but someone else must approve
  Jenkins: Verified
  Murtadha Hubail: Looks good to me, approved




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 5c89b2e..e5a4034 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -59,6 +59,14 @@
         }
     }

+    @Override
+    public void notifyResultSweep(JobId jobId, ResultJobRecord 
resultJobRecord) {
+        String requestId = resultJobRecord.getRequestId();
+        if (requestId != null) {
+            appCtx.getRequestTracker().notifyResultSweep(jobId, requestId);
+        }
+    }
+
     private void updateResultMetadata(JobId jobId, ResultJobRecord 
resultJobRecord) {
         final ResultSetMetaData resultSetMetaData = 
resultJobRecord.getResultSetMetaData();
         if (resultSetMetaData == null) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a9320f2..86b71d4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -562,8 +562,7 @@
                         metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter.getAndInc()));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.DEFERRED);
-                        metadataProvider
-                                
.setMaxResultReads(!sessionConfig.isIncludeHost() ? UNLIMITED_READS : 
maxResultReads);
+                        
metadataProvider.setMaxResultReads(getMaxResultReads(resultDelivery, 
maxResultReads));
                         if (stats.getProfileType() == Stats.ProfileType.FULL) {
                             this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
                         }
@@ -611,6 +610,11 @@
         }
     }

+    private long getMaxResultReads(ResultDelivery mode, long maxResultReads) {
+        // !sessionConfig.isIncludeHost() typically means the request is via 
the new request API
+        return mode == ResultDelivery.ASYNC && !sessionConfig.isIncludeHost() 
? UNLIMITED_READS : maxResultReads;
+    }
+
     protected void configureMetadataProvider(MetadataProvider 
metadataProvider, Map<String, String> config,
             Counter resultSetIdCounter, FileSplit outputFile, 
IRequestParameters requestParameters,
             Statement statement) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
new file mode 100644
index 0000000..3d681d2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// statuscode 404
+$result
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 8f3ff6d..b5592fe 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -23,6 +23,7 @@

 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;

 public interface IRequestTracker extends IJobLifecycleListener {

@@ -116,4 +117,12 @@
      */
     Optional<IClientRequest> getAsyncOrDeferredRequest(String requestId);

+    /**
+     * Notifies this tracker that the result of job {@code jobId} has been 
consumed.
+     *
+     * @param jobId the job id whose result has been consumed
+     * @param requestId the request id associated with the job
+     */
+    void notifyResultSweep(JobId jobId, String requestId);
+
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 1dbb6b2..f596e07 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -101,6 +101,11 @@
     }

     @Override
+    public void notifyResultSweep(JobId jobId, String requestId) {
+        removeAsyncOrDeferredRequest(requestId);
+    }
+
+    @Override
     public boolean cancel(String requestId) throws HyracksDataException {
         final IClientRequest request = runningRequests.get(requestId);
         if (request == null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
index fe7eb36..546522c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
@@ -22,6 +22,18 @@

 public interface IJobResultCallback {

+    IJobResultCallback INSTANCE_NO_OP = new IJobResultCallback() {
+        @Override
+        public void completed(JobId jobId, ResultJobRecord resultJobRecord) {
+
+        }
+
+        @Override
+        public void notifyResultSweep(JobId jobId, ResultJobRecord 
resultJobRecord) {
+
+        }
+    };
+
     /**
      * Notifies this callback that writing the result of job {@code jobId} has 
been completed successfully.
      *
@@ -29,4 +41,12 @@
      * @param resultJobRecord
      */
     void completed(JobId jobId, ResultJobRecord resultJobRecord);
+
+    /**
+     * Notifies this callback that the result of job {@code jobId} has been 
swept.
+     *
+     * @param jobId the job id whose result has been swept
+     * @param resultJobRecord the result record of the job
+     */
+    void notifyResultSweep(JobId jobId, ResultJobRecord resultJobRecord);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 4308d64..0c57931 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -83,15 +83,17 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final long timestamp;
     private final long resultTtlInNanos; // per-request TTL in nanoseconds, -1 
for system default
+    private final String requestId;
     private long jobStartTime;
     private long jobEndTime;
-    private Status status;
+    private final Status status;
     private ResultSetId rsId;
     private ResultSetMetaData resultSetMetaData;
     private long resultCount;
-    private boolean resultSetOrdered;
+    private final boolean resultSetOrdered;

-    public ResultJobRecord(boolean resultSetOrdered, long resultTtlInNanos) {
+    public ResultJobRecord(boolean resultSetOrdered, long resultTtlInNanos, 
String requestId) {
+        this.requestId = requestId;
         this.timestamp = System.nanoTime();
         this.resultTtlInNanos = resultTtlInNanos;
         this.status = new Status();
@@ -161,6 +163,10 @@
         return resultTtlInNanos;
     }

+    public String getRequestId() {
+        return requestId;
+    }
+
     public Status getStatus() {
         return status;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index 22e4bba..b5e90eb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -123,9 +123,7 @@

     @Override
     public IJobResultCallback getJobResultCallback() {
-        return (jobId, resultJobRecord) -> {
-            // no op
-        };
+        return IJobResultCallback.INSTANCE_NO_OP;
     }

     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 93ee281..dfd10a0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -92,7 +92,8 @@
         if (resultTtl == null) {
             resultTtl = -1L;
         }
-        jobResultLocations.put(jobId, new JobResultInfo(new 
ResultJobRecord(partitionsOrdered, resultTtl), null));
+        jobResultLocations.put(jobId,
+                new JobResultInfo(new ResultJobRecord(partitionsOrdered, 
resultTtl, spec.getRequestId()), null));
     }

     @Override
@@ -230,8 +231,19 @@
     }

     @Override
-    public synchronized void sweep(JobId jobId) {
-        jobResultLocations.remove(jobId);
+    public void sweep(JobId jobId) {
+        JobResultInfo removedJob;
+        synchronized (this) {
+            removedJob = jobResultLocations.remove(jobId);
+        }
+        if (removedJob != null) {
+            ResultJobRecord rec = removedJob.getRecord();
+            try {
+                jobResultCallback.notifyResultSweep(jobId, rec);
+            } catch (Throwable t) {
+                LOGGER.warn("failed to notify result sweep for job {}, req 
{}", jobId, rec.getRequestId(), t);
+            }
+        }
     }

     @Override

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I8b2182582fce9ac588d2259f5dfd282183404c75
Gerrit-Change-Number: 21269
Gerrit-PatchSet: 3
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Murtadha Hubail
Gerrit-Reviewer: Murtadha Hubail <[email protected]>

Reply via email to