[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5746


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177757288
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -19,45 +19,81 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions 
during the execution of the
  * tests. Captured exceptions are thrown as a {@link TestingException}.
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-   private final AtomicReference atomicThrowable;
+   private CompletableFuture errorFuture;
 
public TestingFatalErrorHandler() {
-   atomicThrowable = new AtomicReference<>(null);
+   errorFuture = new CompletableFuture<>();
}
 
-   public void rethrowError() throws TestingException {
-   Throwable throwable = atomicThrowable.get();
+   public synchronized void rethrowError() throws TestingException {
+   final Throwable throwable = getException();
 
if (throwable != null) {
-   throw new TestingException(throwable);
+throw new TestingException(throwable);
+}
+   }
+
+   public synchronized boolean hasExceptionOccurred() {
+   return errorFuture.isDone();
+   }
+
+   @Nullable
+   public synchronized Throwable getException() {
+   if (errorFuture.isDone()) {
+   Throwable throwable = null;
+
+   try {
+   throwable = errorFuture.get();
+   } catch (InterruptedException ie) {
+   Thread.interrupted();
--- End diff --

Arg, very good comment. There is a reason why there are utils like 
`ExceptionUtils`... Will fix it right away.


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177739371
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -19,45 +19,81 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions 
during the execution of the
  * tests. Captured exceptions are thrown as a {@link TestingException}.
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-   private final AtomicReference atomicThrowable;
+   private CompletableFuture errorFuture;
 
public TestingFatalErrorHandler() {
-   atomicThrowable = new AtomicReference<>(null);
+   errorFuture = new CompletableFuture<>();
}
 
-   public void rethrowError() throws TestingException {
-   Throwable throwable = atomicThrowable.get();
+   public synchronized void rethrowError() throws TestingException {
+   final Throwable throwable = getException();
 
if (throwable != null) {
-   throw new TestingException(throwable);
+throw new TestingException(throwable);
+}
+   }
+
+   public synchronized boolean hasExceptionOccurred() {
+   return errorFuture.isDone();
+   }
+
+   @Nullable
+   public synchronized Throwable getException() {
+   if (errorFuture.isDone()) {
+   Throwable throwable = null;
+
+   try {
+   throwable = errorFuture.get();
+   } catch (InterruptedException ie) {
+   Thread.interrupted();
--- End diff --

`interrupted()` clears the flag. You should use `interrupt()`.


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177732527
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
assertThat(jobIds, contains(jobGraph.getJobID()));
}
 
+   /**
+* Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs ids from
+* the {@link SubmittedJobGraphStore}. See FLINK-8943.
+*/
+   @Test
+   public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+   final FlinkException testException = new FlinkException("Test 
exception");
+   submittedJobGraphStore.setJobIdsFunction(
+   (Collection jobIds) -> {
+   throw testException;
+   });
+
+   UUID expectedLeaderSessionId = UUID.randomUUID();
+
+   
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
+
+   
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
+
+   UUID actualLeaderSessionId = 
dispatcherLeaderElectionService.getConfirmationFuture()
+   .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+   assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+   // we expect that a fatal error occurred
+   final Throwable error = 
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   assertThat(ExceptionUtils.findThrowableWithMessage(error, 
testException.getMessage()).isPresent(), is(true));
+
+   fatalErrorHandler.clearError();
+   }
+
+   /**
+* Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs from
+* the {@link SubmittedJobGraphStore}. See FLINK-8943.
+*/
+   @Test
+   public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
+   final FlinkException testException = new FlinkException("Test 
exception");
+
+   final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(jobGraph, null);
+   submittedJobGraphStore.putJobGraph(submittedJobGraph);
+
+   submittedJobGraphStore.setRecoverJobGraphFunction(
+   (JobID jobId, Map 
submittedJobs) -> {
+   throw testException;
+   });
+
+   UUID expectedLeaderSessionId = UUID.randomUUID();
--- End diff --

I think I fixed it in b83b280ce2fb493eb647ffa589613c0b0362f39a which is 
part of #5774 


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177731653
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -19,45 +19,81 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions 
during the execution of the
  * tests. Captured exceptions are thrown as a {@link TestingException}.
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-   private final AtomicReference atomicThrowable;
+   private CompletableFuture errorFuture;
 
public TestingFatalErrorHandler() {
-   atomicThrowable = new AtomicReference<>(null);
+   errorFuture = new CompletableFuture<>();
}
 
-   public void rethrowError() throws TestingException {
-   Throwable throwable = atomicThrowable.get();
+   public synchronized void rethrowError() throws TestingException {
+   final Throwable throwable = getException();
 
if (throwable != null) {
-   throw new TestingException(throwable);
+throw new TestingException(throwable);
+}
+   }
+
+   public synchronized boolean hasExceptionOccurred() {
+   return errorFuture.isDone();
+   }
+
+   @Nullable
+   public synchronized Throwable getException() {
+   if (errorFuture.isDone()) {
+   Throwable throwable = null;
--- End diff --

Will fix it.


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177731740
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
@@ -396,15 +398,22 @@ public void testFailingJobRecovery() throws Exception 
{
 
jobManager = system.actorOf(jobManagerProps);
 
+   final TestProbe testProbe = new TestProbe(system);
+
+   testProbe.watch(jobManager);
+
Future started = Patterns.ask(jobManager, new 
Identify(42), deadline.timeLeft().toMillis());
 
Await.ready(started, deadline.timeLeft());
 
// make the job manager the leader --> this triggers 
the recovery of all jobs
myLeaderElectionService.isLeader(leaderSessionID);
 
-   // check that we have successfully recovered the second 
job
-   assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+   // check that we did not recover any jobs
+   assertThat(recoveredJobs, Matchers.is(empty()));
--- End diff --

True, will make it consistent.


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177731188
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -19,45 +19,81 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions 
during the execution of the
  * tests. Captured exceptions are thrown as a {@link TestingException}.
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-   private final AtomicReference atomicThrowable;
+   private CompletableFuture errorFuture;
 
public TestingFatalErrorHandler() {
-   atomicThrowable = new AtomicReference<>(null);
+   errorFuture = new CompletableFuture<>();
}
 
-   public void rethrowError() throws TestingException {
-   Throwable throwable = atomicThrowable.get();
+   public synchronized void rethrowError() throws TestingException {
+   final Throwable throwable = getException();
 
if (throwable != null) {
-   throw new TestingException(throwable);
+throw new TestingException(throwable);
+}
+   }
+
+   public synchronized boolean hasExceptionOccurred() {
+   return errorFuture.isDone();
+   }
+
+   @Nullable
+   public synchronized Throwable getException() {
+   if (errorFuture.isDone()) {
+   Throwable throwable = null;
+
+   try {
+   throwable = errorFuture.get();
+   } catch (InterruptedException ie) {
+   Thread.interrupted();
--- End diff --

Given that we want to keep the interrupted flag set for the caller, I think 
we have to call `Thread.interrupted`. Otherwise, other components which are 
called later will miss the interruption. Actually it would be better to call 
`ExceptionUtils.checkInterrupted` which is a utility for this behaviour.


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177724627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 ---
@@ -177,7 +178,7 @@ public SubmittedJobGraph recoverJobGraph(JobID jobId) 
throws Exception {
success = true;
return null;
} catch (Exception e) {
-   throw new Exception("Could not retrieve 
the submitted job graph state handle " +
+   throw new FlinkException("Could not 
retrieve the submitted job graph state handle " +
"for " + path + "from the 
submitted job graph store.", e);
--- End diff --

nit: missing space, i.e., `" from [...]"`


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177724449
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
assertThat(jobIds, contains(jobGraph.getJobID()));
}
 
+   /**
+* Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs ids from
+* the {@link SubmittedJobGraphStore}. See FLINK-8943.
+*/
+   @Test
+   public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+   final FlinkException testException = new FlinkException("Test 
exception");
+   submittedJobGraphStore.setJobIdsFunction(
+   (Collection jobIds) -> {
+   throw testException;
+   });
+
+   UUID expectedLeaderSessionId = UUID.randomUUID();
+
+   
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
+
+   
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
+
+   UUID actualLeaderSessionId = 
dispatcherLeaderElectionService.getConfirmationFuture()
+   .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+   assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+   // we expect that a fatal error occurred
+   final Throwable error = 
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+   assertThat(ExceptionUtils.findThrowableWithMessage(error, 
testException.getMessage()).isPresent(), is(true));
+
+   fatalErrorHandler.clearError();
+   }
+
+   /**
+* Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs from
+* the {@link SubmittedJobGraphStore}. See FLINK-8943.
+*/
+   @Test
+   public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
+   final FlinkException testException = new FlinkException("Test 
exception");
+
+   final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(jobGraph, null);
+   submittedJobGraphStore.putJobGraph(submittedJobGraph);
+
+   submittedJobGraphStore.setRecoverJobGraphFunction(
+   (JobID jobId, Map 
submittedJobs) -> {
+   throw testException;
+   });
+
+   UUID expectedLeaderSessionId = UUID.randomUUID();
--- End diff --

Code looks duplicated from here on 
(`testFatalErrorAfterJobIdRecoveryFailure`)


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177722358
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -19,45 +19,81 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions 
during the execution of the
  * tests. Captured exceptions are thrown as a {@link TestingException}.
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-   private final AtomicReference atomicThrowable;
+   private CompletableFuture errorFuture;
 
public TestingFatalErrorHandler() {
-   atomicThrowable = new AtomicReference<>(null);
+   errorFuture = new CompletableFuture<>();
}
 
-   public void rethrowError() throws TestingException {
-   Throwable throwable = atomicThrowable.get();
+   public synchronized void rethrowError() throws TestingException {
+   final Throwable throwable = getException();
 
if (throwable != null) {
-   throw new TestingException(throwable);
+throw new TestingException(throwable);
+}
+   }
+
+   public synchronized boolean hasExceptionOccurred() {
+   return errorFuture.isDone();
+   }
+
+   @Nullable
+   public synchronized Throwable getException() {
+   if (errorFuture.isDone()) {
+   Throwable throwable = null;
--- End diff --

nit: redundant initialization to `null`


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177721959
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -19,45 +19,81 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions 
during the execution of the
  * tests. Captured exceptions are thrown as a {@link TestingException}.
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-   private final AtomicReference atomicThrowable;
+   private CompletableFuture errorFuture;
 
public TestingFatalErrorHandler() {
-   atomicThrowable = new AtomicReference<>(null);
+   errorFuture = new CompletableFuture<>();
}
 
-   public void rethrowError() throws TestingException {
-   Throwable throwable = atomicThrowable.get();
+   public synchronized void rethrowError() throws TestingException {
+   final Throwable throwable = getException();
 
if (throwable != null) {
-   throw new TestingException(throwable);
+throw new TestingException(throwable);
+}
+   }
+
+   public synchronized boolean hasExceptionOccurred() {
+   return errorFuture.isDone();
+   }
+
+   @Nullable
+   public synchronized Throwable getException() {
+   if (errorFuture.isDone()) {
+   Throwable throwable = null;
+
+   try {
+   throwable = errorFuture.get();
+   } catch (InterruptedException ie) {
+   Thread.interrupted();
--- End diff --

It does not make sense to call `Thread.interrupted()` here.
>Tests whether the current thread has been interrupted. The interrupted 
status of the thread is cleared by this method. 

However, the interrupted flag should be already cleared:

> By convention, any method that exits by throwing an InterruptedException 
clears interrupt status when it does so.


https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html 


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-28 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5746#discussion_r177723226
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
@@ -396,15 +398,22 @@ public void testFailingJobRecovery() throws Exception 
{
 
jobManager = system.actorOf(jobManagerProps);
 
+   final TestProbe testProbe = new TestProbe(system);
+
+   testProbe.watch(jobManager);
+
Future started = Patterns.ask(jobManager, new 
Identify(42), deadline.timeLeft().toMillis());
 
Await.ready(started, deadline.timeLeft());
 
// make the job manager the leader --> this triggers 
the recovery of all jobs
myLeaderElectionService.isLeader(leaderSessionID);
 
-   // check that we have successfully recovered the second 
job
-   assertThat(recoveredJobs, containsInAnyOrder(jobId2));
+   // check that we did not recover any jobs
+   assertThat(recoveredJobs, Matchers.is(empty()));
--- End diff --

nit: there is a static import for `empty` but not for `is`


---


[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...

2018-03-22 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5746

[FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA store

## What is the purpose of the change

In HA mode, the Dispatcher should fail if it cannot recover the persisted 
jobs. The idea
is that another Dispatcher will be brought up and tries it again. This is 
better than
simply dropping the not recovered jobs.

cc @GJL 

## Brief change log

- Fail the `Dispatcher`/`JobManager` in case that we cannot recover a 
persisted job

## Verifying this change

- Added `DispatcherTest#testFatalErrorAfterJobIdRecoveryFailure` and 
`DispatcherTest#testFatalErrorAfterJobRecoveryFailure`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink failIfJobNotRecoverable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5746


commit d15e5a2897e5b17ee256cac1374bbcee24104fe2
Author: Till Rohrmann 
Date:   2018-03-22T09:46:04Z

[hotfix] Extend TestingFatalErrorHandler to return an error future

commit 50004f3cfcba112d0e7f05b9875931d25d102110
Author: Till Rohrmann 
Date:   2018-03-22T09:46:28Z

[hotfix] Add BiFunctionWithException

commit f6a6d2da064ff80125600a3a90e773684dc24715
Author: Till Rohrmann 
Date:   2018-03-21T21:36:33Z

[FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA store

In HA mode, the Dispatcher should fail if it cannot recover the persisted 
jobs. The idea
is that another Dispatcher will be brought up and tries it again. This is 
better than
simply dropping the not recovered jobs.




---