[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172837682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,16 +404,25 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
-   } else {
-   final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-   if (jobDetails != null) {
-   return 
CompletableFuture.completedFuture(jobDetails.getStatus());
-   } else {
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
--- End diff --

I think methods such as `requestJob` and `requestOperatorBackPressureStats` 
are equally affected by this bug. I would also like to hear @tillrohrmann's 
opinion first before proceeding.


---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172835665
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,7 +406,29 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
+   CompletableFuture statusFuture = 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
+   statusFuture.handle((JobStatus status, 
Throwable throwable) -> {
+   if (throwable != null) {
+   Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
+
+   if (error instanceof 
FencingTokenException) {
--- End diff --

missing else block causes other exceptions to be swallowed...


---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172782747
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,16 +404,25 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
-   } else {
-   final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-   if (jobDetails != null) {
-   return 
CompletableFuture.completedFuture(jobDetails.getStatus());
-   } else {
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
--- End diff --

OK, I will try this way, thanks!


---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172775779
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,16 +404,25 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
-   } else {
-   final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-   if (jobDetails != null) {
-   return 
CompletableFuture.completedFuture(jobDetails.getStatus());
-   } else {
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
--- End diff --

This is an asynchronous call that isn't throwing the exception. You have to 
add a handler to the returned `CompletableFuture`. It also only properly 
resolves one of the exceptions, and IMO shouldn't catch `Exception` but the 
specific exceptions we want the workaround to work for as to not hide other 
issues.

In any case, I'm not sure if adding workarounds to the Dispatcher is the 
right way to go. These issues revealed that some scenarios are not properly 
handled, and I would prefer waiting for @tillrohrmann to really fix this in the 
Dispatcher and related components.

We can temporarily handle both exceptions in the `MiniClusterClient` by 
adding a *single* retry (with a short sleep) if a **specific** exception occurs.


---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-06 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5648

[FLINK-8887][flip-6] ClusterClient.getJobStatus can throw 
FencingTokenException

## What is the purpose of the change

*This pull request fixed ClusterClient.getJobStatus throw 
FencingTokenException issue*


## Brief change log

  - *try-catch request job status from job master gateway if catch a 
exception then goto : request job status  from archived execution graph store*

## Verifying this change

This change is already covered by existing tests, such as 
*DispatcherTest.testCacheJobExecutionResult*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8887

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5648


commit 03c5bf0aa1536842359740b53f43e7b46f48b006
Author: vinoyang 
Date:   2018-03-07T03:43:10Z

[FLINK-8887][flip-6] ClusterClient.getJobStatus can throw 
FencingTokenException




---