[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5746 ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177757288 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference atomicThrowable; + private CompletableFuture errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); +throw new TestingException(throwable); +} + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; + + try { + throwable = errorFuture.get(); + } catch (InterruptedException ie) { + Thread.interrupted(); --- End diff -- Arg, very good comment. There is a reason why there are utils like `ExceptionUtils`... Will fix it right away. ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177739371 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference atomicThrowable; + private CompletableFuture errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); +throw new TestingException(throwable); +} + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; + + try { + throwable = errorFuture.get(); + } catch (InterruptedException ie) { + Thread.interrupted(); --- End diff -- `interrupted()` clears the flag. You should use `interrupt()`. ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177732527 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception { assertThat(jobIds, contains(jobGraph.getJobID())); } + /** +* Tests that the {@link Dispatcher} terminates if it cannot recover jobs ids from +* the {@link SubmittedJobGraphStore}. See FLINK-8943. +*/ + @Test + public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + submittedJobGraphStore.setJobIdsFunction( + (Collection jobIds) -> { + throw testException; + }); + + UUID expectedLeaderSessionId = UUID.randomUUID(); + + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); + + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); + + UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + + // we expect that a fatal error occurred + final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true)); + + fatalErrorHandler.clearError(); + } + + /** +* Tests that the {@link Dispatcher} terminates if it cannot recover jobs from +* the {@link SubmittedJobGraphStore}. See FLINK-8943. +*/ + @Test + public void testFatalErrorAfterJobRecoveryFailure() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + + final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null); + submittedJobGraphStore.putJobGraph(submittedJobGraph); + + submittedJobGraphStore.setRecoverJobGraphFunction( + (JobID jobId, MapsubmittedJobs) -> { + throw testException; + }); + + UUID expectedLeaderSessionId = UUID.randomUUID(); --- End diff -- I think I fixed it in b83b280ce2fb493eb647ffa589613c0b0362f39a which is part of #5774 ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177731653 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference atomicThrowable; + private CompletableFuture errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); +throw new TestingException(throwable); +} + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; --- End diff -- Will fix it. ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177731740 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java --- @@ -396,15 +398,22 @@ public void testFailingJobRecovery() throws Exception { jobManager = system.actorOf(jobManagerProps); + final TestProbe testProbe = new TestProbe(system); + + testProbe.watch(jobManager); + Future started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis()); Await.ready(started, deadline.timeLeft()); // make the job manager the leader --> this triggers the recovery of all jobs myLeaderElectionService.isLeader(leaderSessionID); - // check that we have successfully recovered the second job - assertThat(recoveredJobs, containsInAnyOrder(jobId2)); + // check that we did not recover any jobs + assertThat(recoveredJobs, Matchers.is(empty())); --- End diff -- True, will make it consistent. ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177731188 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference atomicThrowable; + private CompletableFuture errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); +throw new TestingException(throwable); +} + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; + + try { + throwable = errorFuture.get(); + } catch (InterruptedException ie) { + Thread.interrupted(); --- End diff -- Given that we want to keep the interrupted flag set for the caller, I think we have to call `Thread.interrupted`. Otherwise, other components which are called later will miss the interruption. Actually it would be better to call `ExceptionUtils.checkInterrupted` which is a utility for this behaviour. ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177724627 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -177,7 +178,7 @@ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { success = true; return null; } catch (Exception e) { - throw new Exception("Could not retrieve the submitted job graph state handle " + + throw new FlinkException("Could not retrieve the submitted job graph state handle " + "for " + path + "from the submitted job graph store.", e); --- End diff -- nit: missing space, i.e., `" from [...]"` ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177724449 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception { assertThat(jobIds, contains(jobGraph.getJobID())); } + /** +* Tests that the {@link Dispatcher} terminates if it cannot recover jobs ids from +* the {@link SubmittedJobGraphStore}. See FLINK-8943. +*/ + @Test + public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + submittedJobGraphStore.setJobIdsFunction( + (Collection jobIds) -> { + throw testException; + }); + + UUID expectedLeaderSessionId = UUID.randomUUID(); + + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); + + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); + + UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + + // we expect that a fatal error occurred + final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true)); + + fatalErrorHandler.clearError(); + } + + /** +* Tests that the {@link Dispatcher} terminates if it cannot recover jobs from +* the {@link SubmittedJobGraphStore}. See FLINK-8943. +*/ + @Test + public void testFatalErrorAfterJobRecoveryFailure() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + + final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null); + submittedJobGraphStore.putJobGraph(submittedJobGraph); + + submittedJobGraphStore.setRecoverJobGraphFunction( + (JobID jobId, MapsubmittedJobs) -> { + throw testException; + }); + + UUID expectedLeaderSessionId = UUID.randomUUID(); --- End diff -- Code looks duplicated from here on (`testFatalErrorAfterJobIdRecoveryFailure`) ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177722358 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference atomicThrowable; + private CompletableFuture errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); +throw new TestingException(throwable); +} + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; --- End diff -- nit: redundant initialization to `null` ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177721959 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference atomicThrowable; + private CompletableFuture errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); +throw new TestingException(throwable); +} + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; + + try { + throwable = errorFuture.get(); + } catch (InterruptedException ie) { + Thread.interrupted(); --- End diff -- It does not make sense to call `Thread.interrupted()` here. >Tests whether the current thread has been interrupted. The interrupted status of the thread is cleared by this method. However, the interrupted flag should be already cleared: > By convention, any method that exits by throwing an InterruptedException clears interrupt status when it does so. https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177723226 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java --- @@ -396,15 +398,22 @@ public void testFailingJobRecovery() throws Exception { jobManager = system.actorOf(jobManagerProps); + final TestProbe testProbe = new TestProbe(system); + + testProbe.watch(jobManager); + Future started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis()); Await.ready(started, deadline.timeLeft()); // make the job manager the leader --> this triggers the recovery of all jobs myLeaderElectionService.isLeader(leaderSessionID); - // check that we have successfully recovered the second job - assertThat(recoveredJobs, containsInAnyOrder(jobId2)); + // check that we did not recover any jobs + assertThat(recoveredJobs, Matchers.is(empty())); --- End diff -- nit: there is a static import for `empty` but not for `is` ---
[GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5746 [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA store ## What is the purpose of the change In HA mode, the Dispatcher should fail if it cannot recover the persisted jobs. The idea is that another Dispatcher will be brought up and tries it again. This is better than simply dropping the not recovered jobs. cc @GJL ## Brief change log - Fail the `Dispatcher`/`JobManager` in case that we cannot recover a persisted job ## Verifying this change - Added `DispatcherTest#testFatalErrorAfterJobIdRecoveryFailure` and `DispatcherTest#testFatalErrorAfterJobRecoveryFailure` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink failIfJobNotRecoverable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5746.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5746 commit d15e5a2897e5b17ee256cac1374bbcee24104fe2 Author: Till RohrmannDate: 2018-03-22T09:46:04Z [hotfix] Extend TestingFatalErrorHandler to return an error future commit 50004f3cfcba112d0e7f05b9875931d25d102110 Author: Till Rohrmann Date: 2018-03-22T09:46:28Z [hotfix] Add BiFunctionWithException commit f6a6d2da064ff80125600a3a90e773684dc24715 Author: Till Rohrmann Date: 2018-03-21T21:36:33Z [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA store In HA mode, the Dispatcher should fail if it cannot recover the persisted jobs. The idea is that another Dispatcher will be brought up and tries it again. This is better than simply dropping the not recovered jobs. ---