[FLINK-8700] Add ClusterClient.getJobStatus()

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e253b5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e253b5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e253b5d

Branch: refs/heads/release-1.5
Commit: 9e253b5d1ecaf0066d7aeae2828eee4f4671b2e8
Parents: dbb8d2f
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Feb 26 11:53:47 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Mar 11 08:31:56 2018 -0700

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 29 ++++++++++++++++++++
 .../flink/client/program/MiniClusterClient.java |  1 +
 .../client/program/rest/RestClusterClient.java  | 14 ++++++++++
 3 files changed, 44 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index a4880db..1a783fc 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -584,6 +585,34 @@ public abstract class ClusterClient<T> {
        }
 
        /**
+        * Requests the {@link JobStatus} of the job with the given {@link 
JobID}.
+        */
+       public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+               final ActorGateway jobManager;
+               try {
+                       jobManager = getJobManagerGateway();
+               } catch (FlinkException e) {
+                       throw new RuntimeException("Could not retrieve 
JobManage gateway.", e);
+               }
+
+               Future<Object> response = 
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
+
+               CompletableFuture<Object> javaFuture = 
FutureUtils.toJava(response);
+
+               return javaFuture.thenApply((responseMessage) -> {
+                       if (responseMessage instanceof 
JobManagerMessages.CurrentJobStatus) {
+                               return ((JobManagerMessages.CurrentJobStatus) 
responseMessage).status();
+                       } else if (responseMessage instanceof 
JobManagerMessages.JobNotFound) {
+                               throw new CompletionException(
+                                       new IllegalStateException("Could not 
find job with JobId " + jobId));
+                       } else {
+                               throw new CompletionException(
+                                       new IllegalStateException("Unknown 
JobManager response of type " + responseMessage.getClass()));
+                       }
+               });
+       }
+
+       /**
         * Cancels a job identified by the job id.
         * @param jobId the job id
         * @throws Exception In case an error occurred.

http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index bbb5d49..7475071 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -137,6 +137,7 @@ public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClust
                throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
        }
 
+       @Override
        public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
                return guardWithSingleRetry(() -> 
miniCluster.getJobStatus(jobId), scheduledExecutor);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 18ff099..8cf0d2c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -61,6 +62,8 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
@@ -254,6 +257,17 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                }
        }
 
+       @Override
+       public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+               JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
+               final JobMessageParameters  params = new JobMessageParameters();
+               params.jobPathParameter.resolve(jobId);
+
+               CompletableFuture<JobDetailsInfo> responseFuture = 
sendRequest(detailsHeaders, params);
+
+               return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
+       }
+
        /**
         * Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
         * times to poll the {@link JobResult} before giving up.

Reply via email to