>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email )

Change subject: [ASTERIXDB-3649][API] Async API enhancements
......................................................................

[ASTERIXDB-3649][API] Async API enhancements

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Certain statements like COPY can be tracked in async mode
but they don't produce a result. This needs to be handled.

- Update ResultJobRecord of jobs with no ResultMetadata
  when they are finished (otherwise, status remains RUNNING
  even if finished, calling StatusApi will show them as RUNNING)
- Fixed GET StatusApi to check if ResultMetadata exists.
- Fixed GET ResultApi to check if ResultMetadata exists.
- Increased timeout of result fetch when discarding to 10 seconds.
- Fixed jobQueueTime to take care of cases where a job
  have ended without it starting.

Ext-ref: MB-69765

Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
8 files changed, 54 insertions(+), 15 deletions(-)

Approvals:
  Jenkins: Verified; Verified
  Ali Alsuliman: Looks good to me, but someone else must approve
  Murtadha Hubail: Looks good to me, approved
  Anon. E. Moose #1000171:




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4e9a366..72c2ebc 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -183,7 +183,11 @@
         putTime(json, state.endTime, "jobEndTime", dateTime);
         long queueTime = 0;
         if (state.createTime > 0) {
-            queueTime = (state.startTime > 0 ? state.startTime : 
System.currentTimeMillis()) - state.createTime;
+            // startTime - createTime, if job has started
+            // endTime - createTime, if job has ended but not started (failed 
while in the queue, cancelled/timeout)
+            // currentTime - createTime, if job is still in the queue
+            queueTime = (state.startTime > 0 ? state.startTime
+                    : (state.endTime > 0 ? state.endTime : 
System.currentTimeMillis())) - state.createTime;
         }
         json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
         json.put("jobStatus", String.valueOf(state.status));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
index 90978fb..1340893 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryResultApiServlet.java
@@ -50,6 +50,7 @@
         DiscardResultRequestMessage request =
                 new DiscardResultRequestMessage(serviceCtx.getNodeId(), jobId, 
resultSetId, requestId);
         try {
+            //TODO: handle receive response
             messageBroker.sendMessageToPrimaryCC(request);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index eb0881d..97a4b7f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -107,11 +107,16 @@
         try {
             ResultJobRecord.Status status = resultReader.getStatus();
             final HttpResponseStatus httpStatus = 
ResultUtil.getHttpStatusFromResultStatus(status);
-            response.setStatus(httpStatus);
             if (httpStatus != HttpResponseStatus.OK) {
+                response.setStatus(httpStatus);
                 return;
             }
             ResultMetadata metadata = (ResultMetadata) 
resultReader.getMetadata();
+            if (metadata == null) {
+                response.setStatus(HttpResponseStatus.NOT_FOUND);
+                return;
+            }
+            response.setStatus(httpStatus);
             SessionOutput sessionOutput = initResponse(request, response, 
metadata.getFormat());
             processResults(handle, resultReader, sessionOutput, metadata, 
request);
         } catch (HyracksDataException e) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index f94a98b..9467fa0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -90,15 +90,23 @@
         printer.begin();
         printer.addHeaderPrinter(new StatusPrinter(resultStatus));
         printer.printHeaders();
+        ResultMetadata metadata = null;
+        if (uriMode || resultStatus == ResultStatus.SUCCESS) {
+            metadata = (ResultMetadata) resultReader.getMetadata();
+        }
         switch (resultStatus) {
-            case SUCCESS -> handleSuccessfulResult(request, strHandle, 
uriMode, printer, resultReader);
+            case SUCCESS -> {
+                if (metadata != null) {
+                    handleSuccessfulResult(request, strHandle, uriMode, 
printer, resultReader, metadata);
+                }
+            }
             case TIMEOUT -> handleTimeout(handle, executionState, printer, 
response);
             case FATAL, FAILED -> handleFailure(handle, executionState, 
printer, response, resultReaderStatus);
             case QUEUED, RUNNING -> {}
         }
         printer.printResults();
         if (uriMode) {
-            printMetricsAndFooters(printer, resultReader, request, 
handle.getRequestId(), handle.getJobId(), resultStatus);
+            printMetricsAndFooters(printer, metadata, request, 
handle.getRequestId(), handle.getJobId(), resultStatus);
         }
         printer.end();
         if (response.writer().checkError()) {
@@ -107,7 +115,7 @@
     }

     private void handleSuccessfulResult(IServletRequest request, String 
strHandle, boolean uriMode,
-            ResponsePrinter printer, ResultReader resultReader) throws 
HyracksDataException {
+            ResponsePrinter printer, ResultReader resultReader, ResultMetadata 
metadata) throws HyracksDataException {
         String servletPath = servletPath(request).replace("status", "result");
         String resHandle;
         if (uriMode) {
@@ -117,9 +125,7 @@
         }
         printer.addResultPrinter(new ResultHandlePrinter(resHandle));
         if (uriMode) {
-            ResultMetadata metadata = (ResultMetadata) 
resultReader.getMetadata();
-            printer.addResultPrinter(new ResultCountPrinter(
-                    ((ResultMetadata) 
(resultReader.getResultSetReader().getResultMetadata())).getResultCount()));
+            printer.addResultPrinter(new 
ResultCountPrinter(metadata.getResultCount()));
             printer.addResultPrinter(new 
PartitionInfoPrinter(resultReader.getResultSetReader().getResultRecords(),
                     resHandle, metadata.isResultSetOrdered()));
         }
@@ -172,9 +178,8 @@
         }
     }

-    private void printMetricsAndFooters(ResponsePrinter printer, ResultReader 
resultReader, IServletRequest request,
+    private void printMetricsAndFooters(ResponsePrinter printer, 
ResultMetadata metadata, IServletRequest request,
             String requestId, JobId jobId, ResultStatus status) throws 
HyracksDataException {
-        ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata();
         if (metadata != null && status != ResultStatus.QUEUED && status != 
ResultStatus.RUNNING) {
             printMetricsWithResultMetadata(printer, request, metadata);
         } else {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
index 7ca897a..79ddbca 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DiscardResultRequestMessage.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -45,6 +46,11 @@

     @Override
     public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException {
-        AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) 
appCtx, jobId, resultSetId, requestId);
+        try {
+            
AsyncRequestsAPIUtil.discardResultPartitions((ICcApplicationContext) appCtx, 
jobId, resultSetId, requestId);
+        } catch (Throwable th) {
+            // catching Throwable to prevent any unexpected exception from 
crashing the CC
+            LOGGER.log(Level.WARN, "unexpected exception while processing 
discard result request message", th);
+        }
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
index 88184fa..b92ec57 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/AsyncRequestsAPIUtil.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultDirectoryRecord;
 import org.apache.hyracks.api.result.ResultJobRecord;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -55,7 +56,7 @@
 public class AsyncRequestsAPIUtil {

     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(1);
+    private static final long RESULT_PARTITIONS_FETCH_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
     public static final long NC_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);

     /**
@@ -71,7 +72,7 @@
             String requestId) throws HyracksDataException {
         IResultDirectoryService resultDirectoryService =
                 ((ClusterControllerService) 
appCtx.getServiceContext().getControllerService())
-                        .getResultDirectoryService();;
+                        .getResultDirectoryService();
         // Check if result is in a valid state for discarding
         ResultJobRecord.Status status = 
resultDirectoryService.getResultStatus(jobId, resultSetId);
         if (status.getState() != ResultJobRecord.State.SUCCESS) {
@@ -79,6 +80,16 @@
                     resultSetId, requestId, status);
             return;
         }
+        IResultMetadata resultMetadata = 
resultDirectoryService.getResultMetadata(jobId, resultSetId);
+        if (resultMetadata == null) {
+            LOGGER.debug(
+                    "Result metadata not found for job {}, result set {}, 
request id {}. Removing async req tracking info",
+                    jobId, resultSetId, requestId);
+            if (requestId != null) {
+                
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(requestId);
+            }
+            return;
+        }

         // Send discard result messages to all nodes containing result 
partitions
         Set<String> nodeIds = fetchResultNodeIds(resultDirectoryService, 
jobId, resultSetId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 73440b1..4308d64 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -25,6 +25,7 @@

 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -110,8 +111,14 @@
         updateState(State.RUNNING);
     }

-    public void finish() {
+    public void finish(JobStatus jobStatus) {
         jobEndTime = System.nanoTime();
+        if (jobStatus != null && (status.state == State.RUNNING || 
status.state == State.IDLE)) {
+            switch (jobStatus) {
+                case TERMINATED -> updateState(State.SUCCESS);
+                case FAILURE, FAILURE_BEFORE_EXECUTION ->  
updateState(State.FAILED);
+            }
+        }
     }

     public long getJobDuration() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 049f1c3..93ee281 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -108,7 +108,7 @@
             if (resultJobRecord == null) {
                 return;
             }
-            resultJobRecord.finish();
+            resultJobRecord.finish(jobStatus);
             jobResultCallback.completed(jobId, resultJobRecord);
         } else {
             reportJobFailure(jobId, exceptions);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21183?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Id85284a685d15fc10f315572bc0cd2abeabcb534
Gerrit-Change-Number: 21183
Gerrit-PatchSet: 3
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>

Reply via email to