Till Westmann has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/3295
Change subject: [NO ISSUE][FUN] active_requests() return more requests ...................................................................... [NO ISSUE][FUN] active_requests() return more requests - user model changes: yes - storage format changes: no - interface changes: no The active_requests() function returns a few recently completed requests in addition to the currently running requests. Change-Id: I945ba790853bbcbff29dbb4daa4e0df96f7cae2d --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java 4 files changed, 41 insertions(+), 4 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/95/3295/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index 50e6cc2..cce1213 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -44,6 +44,7 @@ if (complete) { return; } + state = "completed"; complete = true; } @@ -53,6 +54,7 @@ return; } complete(); + state = "cancelled"; if (cancellable) { doCancel(appCtx); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index c19bb02..c051026 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -51,6 +51,11 @@ setRunning(); } + @Override + public JobId getJobId() { + return jobId; + } + public Thread getExecutor() { return executor; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 430cd2a..0cc0ba0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -20,6 +20,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; public interface IClientRequest { @@ -38,6 +39,13 @@ String getClientContextId(); /** + * The JobId assigned by the system for job execution + * + * @return the system provided JobId + */ + JobId getJobId(); + + /** * Mark the request as complete, non-cancellable anymore */ void complete(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index a0ab559..61403e0 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -20,18 +20,25 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.JobRun; public class RequestTracker implements IRequestTracker { private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>(); + private final Map<String, IClientRequest> archivedRequests = new ConcurrentHashMap<>(); private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>(); private final ICcApplicationContext ccAppCtx; @@ -81,7 +88,9 @@ @Override public synchronized Collection<IClientRequest> getRunningRequests() { - return Collections.unmodifiableCollection(runningRequests.values()); + Collection<IClientRequest> allRequests = new HashSet<>(runningRequests.values()); + allRequests.addAll(archivedRequests.values()); + return Collections.unmodifiableCollection(allRequests); } private void cancel(IClientRequest request) throws HyracksDataException { @@ -91,9 +100,22 @@ private void untrack(IClientRequest request) { runningRequests.remove(request.getId()); - final String clientContextId = request.getClientContextId(); - if (clientContextId != null) { - clientIdRequests.remove(request.getClientContextId()); + archivedRequests.put(request.getId(), request); + collectArchivedRequests(); + } + + private void collectArchivedRequests() { + ClusterControllerService ccs = (ClusterControllerService) ccAppCtx.getServiceContext().getControllerService(); + Set<JobId> archivedJobIds = + ccs.getJobManager().getArchivedJobs().stream().map(JobRun::getJobId).collect(Collectors.toSet()); + for (IClientRequest archivedRequest : archivedRequests.values()) { + if (!archivedJobIds.contains(archivedRequest.getJobId())) { + archivedRequests.remove(archivedRequest.getId()); + final String clientContextId = archivedRequest.getClientContextId(); + if (clientContextId != null) { + clientIdRequests.remove(archivedRequest.getClientContextId()); + } + } } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3295 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I945ba790853bbcbff29dbb4daa4e0df96f7cae2d Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <ti...@apache.org>