[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172837682 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,16 +404,25 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); - } else { - final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); - - if (jobDetails != null) { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } else { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); --- End diff -- I think methods such as `requestJob` and `requestOperatorBackPressureStats` are equally affected by this bug. I would also like to hear @tillrohrmann's opinion first before proceeding. ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172835665 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,7 +406,29 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); + CompletableFuture statusFuture = jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); + statusFuture.handle((JobStatus status, Throwable throwable) -> { + if (throwable != null) { + Throwable error = ExceptionUtils.stripCompletionException(throwable); + + if (error instanceof FencingTokenException) { --- End diff -- missing else block causes other exceptions to be swallowed... ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172782747 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,16 +404,25 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); - } else { - final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); - - if (jobDetails != null) { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } else { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); --- End diff -- OK, I will try this way, thanks! ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172775779 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,16 +404,25 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); - } else { - final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); - - if (jobDetails != null) { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } else { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); --- End diff -- This is an asynchronous call that isn't throwing the exception. You have to add a handler to the returned `CompletableFuture`. It also only properly resolves one of the exceptions, and IMO shouldn't catch `Exception` but the specific exceptions we want the workaround to work for as to not hide other issues. In any case, I'm not sure if adding workarounds to the Dispatcher is the right way to go. These issues revealed that some scenarios are not properly handled, and I would prefer waiting for @tillrohrmann to really fix this in the Dispatcher and related components. We can temporarily handle both exceptions in the `MiniClusterClient` by adding a *single* retry (with a short sleep) if a **specific** exception occurs. ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5648 [FLINK-8887][flip-6] ClusterClient.getJobStatus can throw FencingTokenException ## What is the purpose of the change *This pull request fixed ClusterClient.getJobStatus throw FencingTokenException issue* ## Brief change log - *try-catch request job status from job master gateway if catch a exception then goto : request job status from archived execution graph store* ## Verifying this change This change is already covered by existing tests, such as *DispatcherTest.testCacheJobExecutionResult*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-8887 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5648.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5648 commit 03c5bf0aa1536842359740b53f43e7b46f48b006 Author: vinoyang Date: 2018-03-07T03:43:10Z [FLINK-8887][flip-6] ClusterClient.getJobStatus can throw FencingTokenException ---