>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271?usp=email )

Change subject: [ASTERIXDB-3649][API] Untrack non-ASYNC requests
......................................................................

[ASTERIXDB-3649][API] Untrack non-ASYNC requests

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
For a non-ASYNC request that has a statement not producing
a result, remove the tracking entry when the statement
execution is done.
For all other statements that failed, sweep the
result directory (remove job record entry + tracking entry)

- when a request is archived, nullify the thread executing
the request.
- on joblet clean-up in NCs, remove partial results if the
job has failed.

Ext-ref: MB-71997
Change-Id: I48f88ef268447f1bced0006087b21dc8bb1ff4c1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271
Reviewed-by: Ali Alsuliman <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ian Maxon <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
6 files changed, 87 insertions(+), 18 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, but someone else must approve
  Ian Maxon: Looks good to me, approved
  Jenkins: Verified; Verified
  Anon. E. Moose #1000171:




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 72c2ebc..6a533b3 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -44,7 +44,7 @@
             StorageUtil.getIntSizeInBytes(64, 
StorageUtil.StorageUnit.KILOBYTE);
     protected final long creationTime = System.nanoTime();
     protected final long creationSystemTime = System.currentTimeMillis();
-    protected final Thread executor;
+    protected Thread executor;
     protected final String statement;
     protected final String clientContextId;
     protected final JobState jobState;
@@ -66,6 +66,11 @@
         return clientContextId;
     }

+    @Override
+    public void archived() {
+        executor = null;
+    }
+
     public void setPlan(String plan) {
         if (plan != null) {
             this.plan = plan.length() > MAX_STATEMENT_LENGTH ? 
plan.substring(0, MAX_STATEMENT_LENGTH) : plan;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8aa308f..b7e195b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -372,12 +372,13 @@
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, 
IRequestParameters requestParameters) throws Exception {
         validateStatements(requestParameters);
-        trackRequest(requestParameters);
+        boolean trackInAsyncDeferredRequests = 
shouldTrackAsSeparateRequest(requestParameters);
+        trackRequest(requestParameters, trackInAsyncDeferredRequests);
         Counter resultSetIdCounter = new Counter(0);
         FileSplit outputFile = null;
         String threadName = Thread.currentThread().getName();
-        Thread.currentThread().setName(
-                QueryTranslator.class.getSimpleName() + ":" + 
requestParameters.getRequestReference().getUuid());
+        String reqId = requestParameters.getRequestReference().getUuid();
+        Thread.currentThread().setName(QueryTranslator.class.getSimpleName() + 
":" + reqId);
         Map<String, String> config = new HashMap<>();
         final IResultSet resultSet = requestParameters.getResultSet();
         final ResultDelivery resultDelivery = 
requestParameters.getResultProperties().getDelivery();
@@ -387,6 +388,8 @@
         final ResultMetadata outMetadata = requestParameters.getOutMetadata();
         final Map<String, IAObject> stmtParams = 
requestParameters.getStatementParameters();
         warningCollector.setMaxWarnings(sessionConfig.getMaxWarnings());
+        boolean hasResultSet = false;
+        Exception exception = null;
         try {
             for (Statement stmt : statements) {
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
@@ -598,19 +601,55 @@
                         throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, 
stmt.getSourceLocation(),
                                 "Unexpected statement: " + kind);
                 }
+                hasResultSet |= metadataProvider.getResultSetId() != null;
             }
         } catch (Exception ex) {
+            // need to handle job and request clean-ups in case of failures
             this.appCtx.getRequestTracker().incrementFailedRequests();
+            exception = ex;
             throw ex;
         } finally {
             // async queries are completed after their job completes
             if (statements.isEmpty() || ResultDelivery.ASYNC != 
resultDelivery) {
-                
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
+                // this assumes 1-1 mapping between a request and a statement, 
needs to be adapted for multi-statement
+                appCtx.getRequestTracker().complete(reqId);
+                if (ResultDelivery.ASYNC != resultDelivery) {
+                    completeDeferred(reqId, hasResultSet, exception);
+                }
             }
             Thread.currentThread().setName(threadName);
         }
     }

+    private void completeDeferred(String reqId, boolean hasResultSet, 
Exception exception) {
+        try {
+            Optional<IClientRequest> optClientReq = 
appCtx.getRequestTracker().getAsyncOrDeferredRequest(reqId);
+            if (optClientReq.isPresent()) {
+                ClientRequest clientRequest = (ClientRequest) 
optClientReq.get();
+                JobId jobId = clientRequest.getJobId();
+                if (jobId == null) {
+                    // jobId = null either means:
+                    // 1. compile error, compile-only, ... resulting in no job 
created whether query, DML or DDL
+                    // 2. statement currently not setting jobId in the client 
request, e.g. DDLs
+                    // for 2. it needs to be handled so that the job record is 
removed also if not producing a result
+                    
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(reqId);
+                } else if (!hasResultSet) {
+                    // don't sweep ones that completed successfully and 
produced a result
+                    // sweeps statements not producing a result, e.g. DMLs 
without a return clause
+                    // sweeps statements that threw an exception whether 
query, DML or DDL
+                    ClusterControllerService ccSvs =
+                            (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
+                    ccSvs.getResultDirectoryService().sweep(jobId);
+                }
+            }
+        } catch (Throwable th) {
+            if (exception != null) {
+                exception.addSuppressed(th);
+            }
+            LOGGER.log(Level.WARN, "Failed to clean up deferred request", th);
+        }
+    }
+
     private long getMaxResultReads(ResultDelivery mode, long maxResultReads) {
         // !sessionConfig.isIncludeHost() typically means the request is via 
the new request API
         return mode == ResultDelivery.ASYNC && !sessionConfig.isIncludeHost() 
? UNLIMITED_READS : maxResultReads;
@@ -6007,10 +6046,11 @@
         }
     }

-    protected void trackRequest(IRequestParameters requestParameters) throws 
HyracksDataException {
+    protected void trackRequest(IRequestParameters requestParameters, boolean 
trackInAsyncDeferredRequests)
+            throws HyracksDataException {
         final IClientRequest clientRequest = 
appCtx.getReceptionist().requestReceived(requestParameters);
         this.appCtx.getRequestTracker().track(clientRequest);
-        if (shouldTrackAsSeparateRequest(requestParameters)) {
+        if (trackInAsyncDeferredRequests) {
             
appCtx.getRequestTracker().trackAsyncOrDeferredRequest(clientRequest);
         }
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 2d239c1..496b2a1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -101,6 +101,12 @@
     boolean cancel(ICcApplicationContext appCtx) throws HyracksDataException;

     /**
+     * Called when the request is archived.
+     * The request is archived when it is completed or cancelled and removed 
from the list of running requests.
+     */
+    void archived();
+
+    /**
      * @return A json string representation of this request
      */
     String toJson();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 62716be..3b1f1bd 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -154,6 +154,7 @@
                 clientIdRequests.remove(completedRequest.getClientContextId());
             }
             archive(completedRequest);
+            completedRequest.archived();
         }
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index dfd10a0..9217d82 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -232,17 +232,23 @@

     @Override
     public void sweep(JobId jobId) {
-        JobResultInfo removedJob;
-        synchronized (this) {
-            removedJob = jobResultLocations.remove(jobId);
-        }
+        JobResultInfo removedJob = sweepJob(jobId);
         if (removedJob != null) {
-            ResultJobRecord rec = removedJob.getRecord();
-            try {
-                jobResultCallback.notifyResultSweep(jobId, rec);
-            } catch (Throwable t) {
-                LOGGER.warn("failed to notify result sweep for job {}, req 
{}", jobId, rec.getRequestId(), t);
-            }
+            notifyResultSweep(jobId, removedJob);
+        }
+    }
+
+    private synchronized JobResultInfo sweepJob(JobId jobId) {
+        return jobResultLocations.remove(jobId);
+    }
+
+    private void notifyResultSweep(JobId jobId, JobResultInfo removedJob) {
+        // ResultJobRecord should never be null
+        ResultJobRecord rec = removedJob.getRecord();
+        try {
+            jobResultCallback.notifyResultSweep(jobId, rec);
+        } catch (Throwable t) {
+            LOGGER.warn("failed to notify result sweep for job {}, req {}", 
jobId, rec.getRequestId(), t);
         }
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 3b4e54b..2e8ad27 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -54,6 +54,7 @@
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.api.resources.memory.IFrameProfiler;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.PartitionState;
@@ -269,7 +270,17 @@
             LOGGER.trace(() -> "Freeing leaked " + stillAllocated + " bytes");
             serviceCtx.getMemoryManager().deallocate(stillAllocated);
         }
-        nodeController.getExecutor().execute(() -> 
deallocatableRegistry.close());
+        nodeController.getExecutor().execute(() -> {
+            try {
+                InvokeUtil.tryWithCleanups(deallocatableRegistry::close, () -> 
{
+                    if (cleanupStatus != JobStatus.TERMINATED) {
+                        
nodeController.getResultPartitionManager().sweep(jobId);
+                    }
+                });
+            } catch (Exception e) {
+                LOGGER.warn("Failure during joblet clean-up", e);
+            }
+        });
     }

     @Override

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I48f88ef268447f1bced0006087b21dc8bb1ff4c1
Gerrit-Change-Number: 21271
Gerrit-PatchSet: 5
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Ian Maxon <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Murtadha Hubail
Gerrit-Reviewer: Murtadha Hubail <[email protected]>

Reply via email to