Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3295

Change subject: [NO ISSUE][FUN] active_requests() return more requests
......................................................................

[NO ISSUE][FUN] active_requests() return more requests

- user model changes: yes
- storage format changes: no
- interface changes: no

The active_requests() function returns a few recently completed requests in
addition to the currently running requests.

Change-Id: I945ba790853bbcbff29dbb4daa4e0df96f7cae2d
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
4 files changed, 41 insertions(+), 4 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/95/3295/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 50e6cc2..cce1213 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -44,6 +44,7 @@
         if (complete) {
             return;
         }
+        state = "completed";
         complete = true;
     }
 
@@ -53,6 +54,7 @@
             return;
         }
         complete();
+        state = "cancelled";
         if (cancellable) {
             doCancel(appCtx);
         }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index c19bb02..c051026 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -51,6 +51,11 @@
         setRunning();
     }
 
+    @Override
+    public JobId getJobId() {
+        return jobId;
+    }
+
     public Thread getExecutor() {
         return executor;
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 430cd2a..0cc0ba0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 
 public interface IClientRequest {
 
@@ -38,6 +39,13 @@
     String getClientContextId();
 
     /**
+     * The JobId assigned by the system for job execution
+     *
+     * @return the system provided JobId
+     */
+    JobId getJobId();
+
+    /**
      * Mark the request as complete, non-cancellable anymore
      */
     void complete();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index a0ab559..61403e0 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -20,18 +20,25 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
 
 public class RequestTracker implements IRequestTracker {
 
     private final Map<String, IClientRequest> runningRequests = new 
ConcurrentHashMap<>();
+    private final Map<String, IClientRequest> archivedRequests = new 
ConcurrentHashMap<>();
     private final Map<String, IClientRequest> clientIdRequests = new 
ConcurrentHashMap<>();
     private final ICcApplicationContext ccAppCtx;
 
@@ -81,7 +88,9 @@
 
     @Override
     public synchronized Collection<IClientRequest> getRunningRequests() {
-        return Collections.unmodifiableCollection(runningRequests.values());
+        Collection<IClientRequest> allRequests = new 
HashSet<>(runningRequests.values());
+        allRequests.addAll(archivedRequests.values());
+        return Collections.unmodifiableCollection(allRequests);
     }
 
     private void cancel(IClientRequest request) throws HyracksDataException {
@@ -91,9 +100,22 @@
 
     private void untrack(IClientRequest request) {
         runningRequests.remove(request.getId());
-        final String clientContextId = request.getClientContextId();
-        if (clientContextId != null) {
-            clientIdRequests.remove(request.getClientContextId());
+        archivedRequests.put(request.getId(), request);
+        collectArchivedRequests();
+    }
+
+    private void collectArchivedRequests() {
+        ClusterControllerService ccs = (ClusterControllerService) 
ccAppCtx.getServiceContext().getControllerService();
+        Set<JobId> archivedJobIds =
+                
ccs.getJobManager().getArchivedJobs().stream().map(JobRun::getJobId).collect(Collectors.toSet());
+        for (IClientRequest archivedRequest : archivedRequests.values()) {
+            if (!archivedJobIds.contains(archivedRequest.getJobId())) {
+                archivedRequests.remove(archivedRequest.getId());
+                final String clientContextId = 
archivedRequest.getClientContextId();
+                if (clientContextId != null) {
+                    
clientIdRequests.remove(archivedRequest.getClientContextId());
+                }
+            }
         }
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3295
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I945ba790853bbcbff29dbb4daa4e0df96f7cae2d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <ti...@apache.org>

Reply via email to