[FLINK-8700] Add ClusterClient.getJobStatus()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e253b5d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e253b5d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e253b5d Branch: refs/heads/release-1.5 Commit: 9e253b5d1ecaf0066d7aeae2828eee4f4671b2e8 Parents: dbb8d2f Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Feb 26 11:53:47 2018 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sun Mar 11 08:31:56 2018 -0700 ---------------------------------------------------------------------- .../flink/client/program/ClusterClient.java | 29 ++++++++++++++++++++ .../flink/client/program/MiniClusterClient.java | 1 + .../client/program/rest/RestClusterClient.java | 14 ++++++++++ 3 files changed, 44 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index a4880db..1a783fc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -584,6 +585,34 @@ public abstract class ClusterClient<T> { } /** + * Requests the {@link JobStatus} of the job with the given {@link JobID}. + */ + public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { + final ActorGateway jobManager; + try { + jobManager = getJobManagerGateway(); + } catch (FlinkException e) { + throw new RuntimeException("Could not retrieve JobManage gateway.", e); + } + + Future<Object> response = jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout); + + CompletableFuture<Object> javaFuture = FutureUtils.toJava(response); + + return javaFuture.thenApply((responseMessage) -> { + if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) { + return ((JobManagerMessages.CurrentJobStatus) responseMessage).status(); + } else if (responseMessage instanceof JobManagerMessages.JobNotFound) { + throw new CompletionException( + new IllegalStateException("Could not find job with JobId " + jobId)); + } else { + throw new CompletionException( + new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass())); + } + }); + } + + /** * Cancels a job identified by the job id. * @param jobId the job id * @throws Exception In case an error occurred. http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index bbb5d49..7475071 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -137,6 +137,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); } + @Override public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { return guardWithSingleRetry(() -> miniCluster.getJobStatus(jobId), scheduledExecutor); } http://git-wip-us.apache.org/repos/asf/flink/blob/9e253b5d/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 18ff099..8cf0d2c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -61,6 +62,8 @@ import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; @@ -254,6 +257,17 @@ public class RestClusterClient<T> extends ClusterClient<T> { } } + @Override + public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { + JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(detailsHeaders, params); + + return responseFuture.thenApply(JobDetailsInfo::getJobStatus); + } + /** * Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple * times to poll the {@link JobResult} before giving up.