[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r338065932 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java ## @@ -40,24 +42,38 @@ */ public class ExecutionFailureHandlerTest extends TestLogger { + private static final long restartDelayMs = 1234L; + + private FailoverTopology failoverTopology; + + private TestFailoverStrategy failoverStrategy; + + private TestRestartBackoffTimeStrategy restartStrategy; Review comment: Fine. done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r338065847 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java ## @@ -167,23 +164,49 @@ public void testUnrecoverableErrorCheck() { new Exception(new SuppressRestartsException(new Exception(); } + @Test + public void testGlobalFailureHandling() { + final FailureHandlingResult result = executionFailureHandler.getGlobalFailureHandlingResult( + new Exception("test failure")); + + assertTrue(result.canRestart()); + assertEquals(restartDelayMs, result.getRestartDelayMS()); + assertEquals( + StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), false) + .map(FailoverVertex::getExecutionVertexID) + .collect(Collectors.toSet()), + result.getVerticesToRestart()); + try { + result.getError(); + fail("Cannot get error when the restarting is accepted"); + } catch (IllegalStateException ex) { + // expected + } + } + // // utilities // /** -* A FailoverStrategy implementation for tests. It always suggest restarting the given task set on construction. +* A FailoverStrategy implementation for tests. It always suggests restarting the given tasks to restart. */ private class TestFailoverStrategy implements FailoverStrategy { Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r338065878 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java ## @@ -40,24 +42,38 @@ */ public class ExecutionFailureHandlerTest extends TestLogger { + private static final long restartDelayMs = 1234L; Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r338065809 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java ## @@ -167,23 +164,49 @@ public void testUnrecoverableErrorCheck() { new Exception(new SuppressRestartsException(new Exception(); } + @Test + public void testGlobalFailureHandling() { + final FailureHandlingResult result = executionFailureHandler.getGlobalFailureHandlingResult( + new Exception("test failure")); + + assertTrue(result.canRestart()); + assertEquals(restartDelayMs, result.getRestartDelayMS()); + assertEquals( + StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), false) + .map(FailoverVertex::getExecutionVertexID) + .collect(Collectors.toSet()), + result.getVerticesToRestart()); + try { Review comment: Ok. Removed the unnecessary checks from this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r338062348 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java ## @@ -40,24 +42,38 @@ */ public class ExecutionFailureHandlerTest extends TestLogger { + private static final long restartDelayMs = 1234L; + + private FailoverTopology failoverTopology; + + private TestFailoverStrategy failoverStrategy; + + private TestRestartBackoffTimeStrategy restartStrategy; + + private ExecutionFailureHandler executionFailureHandler; + + @Before + public void setUp() { + TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder(); + topologyBuilder.newVertex(); + failoverTopology = topologyBuilder.build(); + + failoverStrategy = new TestFailoverStrategy(); + restartStrategy = new TestRestartBackoffTimeStrategy(true, restartDelayMs); + executionFailureHandler = new ExecutionFailureHandler(failoverTopology, failoverStrategy, restartStrategy); + } + /** * Tests the case that task restarting is accepted. */ @Test public void testNormalFailureHandling() { - // failover strategy which always suggests restarting the given tasks - Set tasksToRestart = new HashSet<>(); - tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0)); - FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart); - - // restart strategy which accepts restarting - boolean canRestart = true; - long restartDelayMs = 1234; - RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs); - ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy); + final Set tasksToRestart = Collections.singleton( + new ExecutionVertexID(new JobVertexID(), 0)); + failoverStrategy.setTasksToRestart(tasksToRestart); // trigger a task failure - FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( + final FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( new ExecutionVertexID(new JobVertexID(), 0), Review comment: It's by design to be different so that we are sure it's really returning the result from failoverStrategy `failoverStrategy.setTasksToRestart`. This case is to verify task failure handling and is using a `TestFailoverStrategy` so it does not rely on the `FailoverTopology`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r338058224 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java ## @@ -167,23 +164,49 @@ public void testUnrecoverableErrorCheck() { new Exception(new SuppressRestartsException(new Exception(); } + @Test + public void testGlobalFailureHandling() { + final FailureHandlingResult result = executionFailureHandler.getGlobalFailureHandlingResult( + new Exception("test failure")); + + assertTrue(result.canRestart()); + assertEquals(restartDelayMs, result.getRestartDelayMS()); + assertEquals( + StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), false) + .map(FailoverVertex::getExecutionVertexID) + .collect(Collectors.toSet()), + result.getVerticesToRestart()); + try { + result.getError(); + fail("Cannot get error when the restarting is accepted"); + } catch (IllegalStateException ex) { + // expected + } + } + // // utilities // /** -* A FailoverStrategy implementation for tests. It always suggest restarting the given task set on construction. +* A FailoverStrategy implementation for tests. It always suggests restarting the given tasks to restart. */ private class TestFailoverStrategy implements FailoverStrategy { - private final Set tasksToRestart; + private Set tasksToRestart; - public TestFailoverStrategy(Set tasksToRestart) { - this.tasksToRestart = checkNotNull(tasksToRestart); + public TestFailoverStrategy() { + } + + public void setTasksToRestart(final Set tasksToRestart) { + this.tasksToRestart = tasksToRestart; Review comment: Ok. Will keep it as is since it is a private inner test class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r337681258 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java ## @@ -62,18 +71,41 @@ public ExecutionFailureHandler( * @return result of the failure handling */ public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) { + return handleFailure(cause, () -> failoverStrategy.getTasksNeedingRestart(failedTask, cause)); + } + + /** +* Return result of failure handling on a global failure. Can be a set of task vertices to restart +* and a delay of the restarting. Or that the failure is not recoverable and the reason for it. +* +* @param cause of the task failure +* @return result of the failure handling +*/ + public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable cause) { + return handleFailure( + cause, + () -> StreamSupport + .stream(failoverTopology.getFailoverVertices().spliterator(), false) + .map(FailoverVertex::getExecutionVertexID) + .collect(Collectors.toSet())); + } + + private FailureHandlingResult handleFailure( + final Throwable cause, + final Supplier> vertexToRestartSupplier) { Review comment: You are right. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling URL: https://github.com/apache/flink/pull/9904#discussion_r337681304 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -376,6 +376,10 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } + @Override + public void handleGlobalFailure(final Throwable cause) { Review comment: Ok. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services