[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-23 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338065932
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 ##
 @@ -40,24 +42,38 @@
  */
 public class ExecutionFailureHandlerTest extends TestLogger {
 
+   private static final long restartDelayMs = 1234L;
+
+   private FailoverTopology failoverTopology;
+
+   private TestFailoverStrategy failoverStrategy;
+
+   private TestRestartBackoffTimeStrategy restartStrategy;
 
 Review comment:
   Fine. done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-23 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338065847
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 ##
 @@ -167,23 +164,49 @@ public void testUnrecoverableErrorCheck() {
new Exception(new SuppressRestartsException(new 
Exception();
}
 
+   @Test
+   public void testGlobalFailureHandling() {
+   final FailureHandlingResult result = 
executionFailureHandler.getGlobalFailureHandlingResult(
+   new Exception("test failure"));
+
+   assertTrue(result.canRestart());
+   assertEquals(restartDelayMs, result.getRestartDelayMS());
+   assertEquals(
+   
StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), 
false)
+   .map(FailoverVertex::getExecutionVertexID)
+   .collect(Collectors.toSet()),
+   result.getVerticesToRestart());
+   try {
+   result.getError();
+   fail("Cannot get error when the restarting is 
accepted");
+   } catch (IllegalStateException ex) {
+   // expected
+   }
+   }
+
// 

//  utilities
// 

 
/**
-* A FailoverStrategy implementation for tests. It always suggest 
restarting the given task set on construction.
+* A FailoverStrategy implementation for tests. It always suggests 
restarting the given tasks to restart.
 */
private class TestFailoverStrategy implements FailoverStrategy {
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-23 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338065878
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 ##
 @@ -40,24 +42,38 @@
  */
 public class ExecutionFailureHandlerTest extends TestLogger {
 
+   private static final long restartDelayMs = 1234L;
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-23 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338065809
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 ##
 @@ -167,23 +164,49 @@ public void testUnrecoverableErrorCheck() {
new Exception(new SuppressRestartsException(new 
Exception();
}
 
+   @Test
+   public void testGlobalFailureHandling() {
+   final FailureHandlingResult result = 
executionFailureHandler.getGlobalFailureHandlingResult(
+   new Exception("test failure"));
+
+   assertTrue(result.canRestart());
+   assertEquals(restartDelayMs, result.getRestartDelayMS());
+   assertEquals(
+   
StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), 
false)
+   .map(FailoverVertex::getExecutionVertexID)
+   .collect(Collectors.toSet()),
+   result.getVerticesToRestart());
+   try {
 
 Review comment:
   Ok. Removed the unnecessary checks from this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-23 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338062348
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 ##
 @@ -40,24 +42,38 @@
  */
 public class ExecutionFailureHandlerTest extends TestLogger {
 
+   private static final long restartDelayMs = 1234L;
+
+   private FailoverTopology failoverTopology;
+
+   private TestFailoverStrategy failoverStrategy;
+
+   private TestRestartBackoffTimeStrategy restartStrategy;
+
+   private ExecutionFailureHandler executionFailureHandler;
+
+   @Before
+   public void setUp() {
+   TestFailoverTopology.Builder topologyBuilder = new 
TestFailoverTopology.Builder();
+   topologyBuilder.newVertex();
+   failoverTopology = topologyBuilder.build();
+
+   failoverStrategy = new TestFailoverStrategy();
+   restartStrategy = new TestRestartBackoffTimeStrategy(true, 
restartDelayMs);
+   executionFailureHandler = new 
ExecutionFailureHandler(failoverTopology, failoverStrategy, restartStrategy);
+   }
+
/**
 * Tests the case that task restarting is accepted.
 */
@Test
public void testNormalFailureHandling() {
-   // failover strategy which always suggests restarting the given 
tasks
-   Set tasksToRestart = new HashSet<>();
-   tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
-   FailoverStrategy failoverStrategy = new 
TestFailoverStrategy(tasksToRestart);
-
-   // restart strategy which accepts restarting
-   boolean canRestart = true;
-   long restartDelayMs = 1234;
-   RestartBackoffTimeStrategy restartStrategy = new 
TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
-   ExecutionFailureHandler executionFailureHandler = new 
ExecutionFailureHandler(failoverStrategy, restartStrategy);
+   final Set tasksToRestart = 
Collections.singleton(
+   new ExecutionVertexID(new JobVertexID(), 0));
+   failoverStrategy.setTasksToRestart(tasksToRestart);
 
// trigger a task failure
-   FailureHandlingResult result = 
executionFailureHandler.getFailureHandlingResult(
+   final FailureHandlingResult result = 
executionFailureHandler.getFailureHandlingResult(
new ExecutionVertexID(new JobVertexID(), 0),
 
 Review comment:
   It's by design to be different so that we are sure it's really returning the 
result from failoverStrategy `failoverStrategy.setTasksToRestart`.
   This case is to verify task failure handling and is using a 
`TestFailoverStrategy` so it does not rely on the `FailoverTopology`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-23 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338058224
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 ##
 @@ -167,23 +164,49 @@ public void testUnrecoverableErrorCheck() {
new Exception(new SuppressRestartsException(new 
Exception();
}
 
+   @Test
+   public void testGlobalFailureHandling() {
+   final FailureHandlingResult result = 
executionFailureHandler.getGlobalFailureHandlingResult(
+   new Exception("test failure"));
+
+   assertTrue(result.canRestart());
+   assertEquals(restartDelayMs, result.getRestartDelayMS());
+   assertEquals(
+   
StreamSupport.stream(failoverTopology.getFailoverVertices().spliterator(), 
false)
+   .map(FailoverVertex::getExecutionVertexID)
+   .collect(Collectors.toSet()),
+   result.getVerticesToRestart());
+   try {
+   result.getError();
+   fail("Cannot get error when the restarting is 
accepted");
+   } catch (IllegalStateException ex) {
+   // expected
+   }
+   }
+
// 

//  utilities
// 

 
/**
-* A FailoverStrategy implementation for tests. It always suggest 
restarting the given task set on construction.
+* A FailoverStrategy implementation for tests. It always suggests 
restarting the given tasks to restart.
 */
private class TestFailoverStrategy implements FailoverStrategy {
 
-   private final Set tasksToRestart;
+   private Set tasksToRestart;
 
-   public TestFailoverStrategy(Set 
tasksToRestart) {
-   this.tasksToRestart = checkNotNull(tasksToRestart);
+   public TestFailoverStrategy() {
+   }
+
+   public void setTasksToRestart(final Set 
tasksToRestart) {
+   this.tasksToRestart = tasksToRestart;
 
 Review comment:
   Ok. Will keep it as is since it is a private inner test class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-22 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r337681258
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
 ##
 @@ -62,18 +71,41 @@ public ExecutionFailureHandler(
 * @return result of the failure handling
 */
public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID 
failedTask, Throwable cause) {
+   return handleFailure(cause, () -> 
failoverStrategy.getTasksNeedingRestart(failedTask, cause));
+   }
+
+   /**
+* Return result of failure handling on a global failure. Can be a set 
of task vertices to restart
+* and a delay of the restarting. Or that the failure is not 
recoverable and the reason for it.
+*
+* @param cause of the task failure
+* @return result of the failure handling
+*/
+   public FailureHandlingResult getGlobalFailureHandlingResult(final 
Throwable cause) {
+   return handleFailure(
+   cause,
+   () -> StreamSupport
+   
.stream(failoverTopology.getFailoverVertices().spliterator(), false)
+   .map(FailoverVertex::getExecutionVertexID)
+   .collect(Collectors.toSet()));
+   }
+
+   private FailureHandlingResult handleFailure(
+   final Throwable cause,
+   final Supplier> 
vertexToRestartSupplier) {
 
 Review comment:
   You are right. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] Support global failure handling in NG scheduling

2019-10-22 Thread GitBox
zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime] 
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r337681304
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -376,6 +376,10 @@ public void cancel() {
return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
}
 
+   @Override
+   public void handleGlobalFailure(final Throwable cause) {
 
 Review comment:
   Ok. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services