Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3295
Change subject: [NO ISSUE][FUN] active_requests() return more requests
......................................................................
[NO ISSUE][FUN] active_requests() return more requests
- user model changes: yes
- storage format changes: no
- interface changes: no
The active_requests() function returns a few recently completed requests in
addition to the currently running requests.
Change-Id: I945ba790853bbcbff29dbb4daa4e0df96f7cae2d
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
4 files changed, 41 insertions(+), 4 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/95/3295/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 50e6cc2..cce1213 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -44,6 +44,7 @@
if (complete) {
return;
}
+ state = "completed";
complete = true;
}
@@ -53,6 +54,7 @@
return;
}
complete();
+ state = "cancelled";
if (cancellable) {
doCancel(appCtx);
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index c19bb02..c051026 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -51,6 +51,11 @@
setRunning();
}
+ @Override
+ public JobId getJobId() {
+ return jobId;
+ }
+
public Thread getExecutor() {
return executor;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 430cd2a..0cc0ba0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
public interface IClientRequest {
@@ -38,6 +39,13 @@
String getClientContextId();
/**
+ * The JobId assigned by the system for job execution
+ *
+ * @return the system provided JobId
+ */
+ JobId getJobId();
+
+ /**
* Mark the request as complete, non-cancellable anymore
*/
void complete();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index a0ab559..61403e0 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -20,18 +20,25 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
public class RequestTracker implements IRequestTracker {
private final Map<String, IClientRequest> runningRequests = new
ConcurrentHashMap<>();
+ private final Map<String, IClientRequest> archivedRequests = new
ConcurrentHashMap<>();
private final Map<String, IClientRequest> clientIdRequests = new
ConcurrentHashMap<>();
private final ICcApplicationContext ccAppCtx;
@@ -81,7 +88,9 @@
@Override
public synchronized Collection<IClientRequest> getRunningRequests() {
- return Collections.unmodifiableCollection(runningRequests.values());
+ Collection<IClientRequest> allRequests = new
HashSet<>(runningRequests.values());
+ allRequests.addAll(archivedRequests.values());
+ return Collections.unmodifiableCollection(allRequests);
}
private void cancel(IClientRequest request) throws HyracksDataException {
@@ -91,9 +100,22 @@
private void untrack(IClientRequest request) {
runningRequests.remove(request.getId());
- final String clientContextId = request.getClientContextId();
- if (clientContextId != null) {
- clientIdRequests.remove(request.getClientContextId());
+ archivedRequests.put(request.getId(), request);
+ collectArchivedRequests();
+ }
+
+ private void collectArchivedRequests() {
+ ClusterControllerService ccs = (ClusterControllerService)
ccAppCtx.getServiceContext().getControllerService();
+ Set<JobId> archivedJobIds =
+
ccs.getJobManager().getArchivedJobs().stream().map(JobRun::getJobId).collect(Collectors.toSet());
+ for (IClientRequest archivedRequest : archivedRequests.values()) {
+ if (!archivedJobIds.contains(archivedRequest.getJobId())) {
+ archivedRequests.remove(archivedRequest.getId());
+ final String clientContextId =
archivedRequest.getClientContextId();
+ if (clientContextId != null) {
+
clientIdRequests.remove(archivedRequest.getClientContextId());
+ }
+ }
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3295
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I945ba790853bbcbff29dbb4daa4e0df96f7cae2d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>