Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui merged PR #24003: URL: https://github.com/apache/flink/pull/24003 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1905713145 Thanks @XComp for the patient review and a series of great suggestions again! Merging~ A good experience~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1905623368 ok, fine with me :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1905208562 > Fair enough. What about squashing the [FLINK-33565](https://issues.apache.org/jira/browse/FLINK-33565)-related commits into a single one? Hi @XComp , it's fine for me. Let me explain why I split them into multiple commits: 1. First of all, the doc of Flink code style introduces us that it's better to separate refactoring, cleanup and independent-changes as the separate commits. As I understand, it has a series of advantages: - Easy to review - Easy to revert - Easy to cherry-pick - In my daily work, we always cherry-pick some new features(our users needed) into our internal flink version, because a lot of flink jobs upgrade the flink version needs a lot of efforts. - During cherry-pick, I found let the ` independent-changes` as the separate commit is very useful. - For example, we need to cherry pick featureB, and featureA is merged before featureB. - FeatureB was using some common changes of FeatureA, FeatureA added some new fields in some common classes.(It's not refactor, but the change isn't huge.) - If FeatureA think it's minor change and keep all changed into 1 commit. We must cherry-pick featureA during cherry-pick featureB. - If featureA split them into multiple commits for each independent-changes, we can cherry-pick the common class changes of featureA, and the whole featureB. 2. Currently, the first commit changed a minor behavior, and it's similar refactor. And it's mentioned in the last comment: https://github.com/apache/flink/pull/24003#issuecomment-1903758592 3. The second commit, it's `[FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt`. - The concurrent exceptions depend on the result of `RestartBackoffTimeStrategy`. - Other features might depend on it as well in the future, so I prefer it's a independent-change 4. The third commit is core change of this feature. 5. The fourth commit is that fixing the `numberOfRestarts` metic, it may be a ndependent-change as well. That's why I split them into 4 commits. As I said before, squashing them into one commit is fine for me. Looking forward to your feedback, thanks~ [1] https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/#3-separate-refactoring-cleanup-and-independent-changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1903887636 Fair enough. What about squashing the FLINK-33565-related commits into a single one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1903758592 > LGTM Thanks for your efforts, @1996fanrui, and for keeping up with my nitty comments. I enjoyed the discussions in this PR. Thanks @XComp for the patient review, I definitely learned some skills from your comments. I enjoyed the discussions as well. > About the commits: We can reorganize them a bit, I feel. You could keep [6fe8037](https://github.com/apache/flink/commit/6fe8037b40d8458a5524adfb352d96925ced63bd) and [f422804](https://github.com/apache/flink/commit/f422804e918a8ea3537deeb1c4f4f1c9676e1a8a) as separate hotfix commits (using the `[hotfix]` prefix rather than the Jira issue) because they improve the code base and we would want to keep them even if we decide to revert [FLINK-33565](https://issues.apache.org/jira/browse/FLINK-33565) (for whatever reason) in the future. WDYT? The idea makes sense to me. I update the commit message for the last commit, but I didn't updated it for the first commit due to some reasons: 1. The first commit changed the behavior - Before this PR, archiving exception when restarting task instead of immediately. - It means, when one task failure, we can see the exception history after flink restart this task. - The first commit archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. 2. I hope the first commit is fine, but I'm afraid we missed somethings , so it's not a simple refactor or hotfix. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1461348956 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; +return true; Review Comment: > I guess, you meant [FLIP-364](https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+exponential-delay+restart-strategy) here? Exactly, sorry for my typo. > What about deprecating that class as part of this PR and coming up with a follow-up Jira issue that replaces the strategy? They are internal class, IIUC, we can refactor it directly without the `@Deprecated` annotation. - Your suggestion is that we still keep the failure-rate and fixed-delay restart-strategies, but we can reuse the `ExponentialDelayRestartBackoffTimeStrategy` class, right? I'm thinking could we deprecate failure-rate and fixed-delay restart-strategies directly? Users can configure them directly. Keep failure-rate and fixed-delay restart-strategies and reusing `ExponentialDelayRestartBackoffTimeStrategy` class still has the semantic problem that I mentioned in the last comment [1]. [1] https://github.com/apache/flink/pull/24003#discussion_r1456807889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1458870055 ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } +@Test +void testNewAttemptAndNumberOfRestarts() throws Exception { +final Set tasksToRestart = +Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0)); +failoverStrategy.setTasksToRestart(tasksToRestart); + +Execution execution = + FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor()); +final Throwable error = new Exception("expected test failure"); + +assertHandlerRootException(execution, error); + +isNewAttempt.set(false); +assertHandlerConcurrentException(execution, error); +assertHandlerConcurrentException(execution, error); + +isNewAttempt.set(true); +assertHandlerRootException(execution, error); +assertHandlerRootException(execution, error); + +isNewAttempt.set(false); +assertHandlerConcurrentException(execution, error); +assertHandlerConcurrentException(execution, error); +} + +private void assertHandlerRootException(Execution execution, Throwable error) { +final long originalNumberOfRestarts = executionFailureHandler.getNumberOfRestarts(); +FailureHandlingResult result = +executionFailureHandler.getFailureHandlingResult( +execution, error, System.currentTimeMillis()); +assertThat(result.isRootCause()) +.as( +"The FailureHandlingResult shouldn be the root cause if exception is new attempt.") Review Comment: ```suggestion "The FailureHandlingResult should be the root cause if exception is new attempt.") ``` ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } +@Test +void testNewAttemptAndNumberOfRestarts() throws Exception { +final Set tasksToRestart = +Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0)); +failoverStrategy.setTasksToRestart(tasksToRestart); + +Execution execution = + FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor()); +final Throwable error = new Exception("expected test failure"); + +assertHandlerRootException(execution, error); + +isNewAttempt.set(false); +assertHandlerConcurrentException(execution, error); +assertHandlerConcurrentException(execution, error); + +isNewAttempt.set(true); +assertHandlerRootException(execution, error); +assertHandlerRootException(execution, error); + +isNewAttempt.set(false); +assertHandlerConcurrentException(execution, error); +assertHandlerConcurrentException(execution, error); +} + +private void assertHandlerRootException(Execution execution, Throwable error) { +final long originalNumberOfRestarts = executionFailureHandler.getNumberOfRestarts(); +FailureHandlingResult result = +executionFailureHandler.getFailureHandlingResult( +execution, error, System.currentTimeMillis()); +assertThat(result.isRootCause()) +.as( +"The FailureHandlingResult shouldn be the root cause if exception is new attempt.") +.isTrue(); +assertThat(executionFailureHandler.getNumberOfRestarts()) +.as("The numberOfRestarts should be increased when it's a root exception.") +.isEqualTo(originalNumberOfRestarts + 1); +} + +private void assertHandlerConcurrentException(Execution execution, Throwable error) { Review Comment: ```suggestion private void testHandlingConcurrentException(Execution execution, Throwable error) { ``` ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } +@Test +void testNewAttemptAndNumberOfRestarts() throws Exception { +final Set tasksToRestart = +Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0)); +failoverStrategy.setTasksToRestart(tasksToRestart); + +Execution
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1899532108 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1456812210 ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } +/** Test isNewAttempt of {@link FailureHandlingResult} is expected. */ +@Test +void testNewAttemptAndNumberOfRestarts() throws Exception { Review Comment: Good suggestion! Also, I extracted the `assertHandlerRootException` and `assertHandlerConcurrentException` methods, after that, the `testNewAttemptAndNumberOfRestarts` is totally simple. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; +return true; Review Comment: > But from a conceptual point of view: shouldn't we cover it also for the two other restart strategies. Or am I missing something here? In the beginning, I want to improve all restart strategies, but I meet some feedback to removing other restart strategies during discussion. The core background is this thread: https://lists.apache.org/thread/l7wyc7pndpsvh2h7hj3fw2td9yphrlox In brief, 3 reasons: 1. The semantics of option - the failure-rate strategy's restart upper limit option is named `restart-strategy.failure-rate.max-failures-per-interval` - It's `max-failures-per-interval` instead of `max-attempts-per-interval`. - If we improve it directly, the name and behaviour aren't matched. 2. We recommend users use the `exponential-delay restart strategy` in the future, it's more powerful. 3. After FLIP-360 discussion, I found `exponential-delay restart strategy` can replace other restart strategies directly if users set the `restart-strategy.exponential-delay.backoff-multiplier` = 1 Actually, I think other restart-strategies can be deprecated in the future. ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ## @@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws ExecutionException, Interrup final CompletableFuture> rootFailureLabels = CompletableFuture.completedFuture(Collections.singletonMap("key", "value")); -final Throwable concurrentException = new IllegalStateException("Expected other failure"); -final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); -final long concurrentExceptionTimestamp = -triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); +final Throwable concurrentException1 = new IllegalStateException("Expected other failure1"); +final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); +Predicate exception1Predicate = +getExceptionHistoryEntryPredicate( +concurrentException1, concurrentlyFailedExecutionVertex1); + +final Throwable concurrentException2 = new IllegalStateException("Expected other failure2"); +final ExecutionVertex concurrentlyFailedExecutionVertex2 = extractExecutionVertex(2); Review Comment: Thanks for the explanation! > testAddConecurrentExceptions will use all code of this test If we have 2 tests, and test1 only call `testCommon`, and test2 call `testCommon` and `testPart2`. It means test2 can cover test1. 2 solutions: - keep test1 and test2 - Only keep test2 In the current scenario, is it enough for us to only keep test2? Looking forward to your opinion, fine with me as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1455652997 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; +return true; Review Comment: I understand that it wasn't covered by FLIP-364. But from a conceptual point of view: shouldn't we cover it also for the two other restart strategies. Or am I missing something here? :thinking: I'm also find with having this being covered by follow-up Jira issues. ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } +/** Test isNewAttempt of {@link FailureHandlingResult} is expected. */ +@Test +void testNewAttemptAndNumberOfRestarts() throws Exception { Review Comment: Like in my previous comment: We can avoid using comments in favor of the assert message :shrug: ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ## @@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws ExecutionException, Interrup final CompletableFuture> rootFailureLabels = CompletableFuture.completedFuture(Collections.singletonMap("key", "value")); -final Throwable concurrentException = new IllegalStateException("Expected other failure"); -final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); -final long concurrentExceptionTimestamp = -triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); +final Throwable concurrentException1 = new IllegalStateException("Expected other failure1"); +final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); +Predicate exception1Predicate = +getExceptionHistoryEntryPredicate( +concurrentException1, concurrentlyFailedExecutionVertex1); + +final Throwable concurrentException2 = new IllegalStateException("Expected other failure2"); +final ExecutionVertex concurrentlyFailedExecutionVertex2 = extractExecutionVertex(2); Review Comment: Fine with me. About reusing the code: You could move the code that's common between tests into a private static method. But other's might argue that this is also not good for code readability. Anyway, it's your choice in the end. Most of my comments are discussion points rather than comments where I claim that my proposal is the right one. :-) ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ## @@ -708,27 +712,43 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture> failureLabels, Iterable executions) { -exceptionHistory.add( +latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( -failure, timestamp, failureLabels, executions)); +failure, timestamp, failureLabels, executions); +exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { +// Handle all subsequent exceptions as the concurrent exceptions when it's not a new +// attempt. +if (!failureHandlingResult.isNewAttempt()) { +checkState( +latestRootExceptionEntry != null, +"A root exception entry should exist if failureHandlingResult wasn't " ++ "generated as part of a new error handling cycle."); +List concurrentlyExecutions = new ArrayList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); + + latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions); +return; +} + if (failureHandlingResult.getRootCauseExecution().isPresent()) { Review Comment: We still might want to move the comment into the if block ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -171,6 +179,9 @@ void
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r145529 ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws Exception { assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); } +/** Test isNewAttempt of {@link FailureHandlingResult} is expected. */ +@Test +void testNewAttemptAndNumberOfRestarts() throws Exception { Review Comment: > The comment is obsolete because it could be expressed through the test method name(s) (to reduce redundant information in the code). Removed the comment. > Additionally, what's your opinion on splitting this big test up into smaller test methods. I see the benefit of having one big one. Because you can check the number of restarts at the same time. But having more specific test methods usually helps with improving readability of the test code.. WDYT? This test is the whole test, it wants to check NumberOfRestarts is excepted when we have a series of exceptions, and the exception merging logic work well. I added some comments inside of this test to try let it is easy to understand. You can check again. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ## @@ -206,6 +214,11 @@ public boolean isGlobalFailure() { return globalFailure; } +/** @return Whether this failure is a new attempt. */ +public boolean isNewAttempt() { Review Comment: Renaming it to `isRootCause` sounds make sense to me. Updated. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java: ## @@ -79,11 +79,12 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { if (isFailureTimestampsQueueFull()) { failureTimestamps.remove(); } failureTimestamps.add(clock.absoluteTimeMillis()); +return true; Review Comment: See the last comment. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -170,11 +176,15 @@ public RootExceptionHistoryEntry( CompletableFuture> failureLabels, @Nullable String failingTaskName, @Nullable TaskManagerLocation taskManagerLocation, -Iterable concurrentExceptions) { +Collection concurrentExceptions) { super(cause, timestamp, failureLabels, failingTaskName, taskManagerLocation); this.concurrentExceptions = concurrentExceptions; } +public void addConcurrentExceptions(Iterable concurrentlyExecutions) { Review Comment: Good catch, added the `@NotThreadSafe`. ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java: ## @@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws ExecutionException, Interrup final CompletableFuture> rootFailureLabels = CompletableFuture.completedFuture(Collections.singletonMap("key", "value")); -final Throwable concurrentException = new IllegalStateException("Expected other failure"); -final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); -final long concurrentExceptionTimestamp = -triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); +final Throwable concurrentException1 = new IllegalStateException("Expected other failure1"); +final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); +Predicate exception1Predicate = +getExceptionHistoryEntryPredicate( +concurrentException1, concurrentlyFailedExecutionVertex1); + +final Throwable concurrentException2 = new IllegalStateException("Expected other failure2"); +final ExecutionVertex concurrentlyFailedExecutionVertex2 = extractExecutionVertex(2); Review Comment: My thoughts is moving the code down instead of create a new test method. Because `testAddConecurrentExceptions` will use all code of this test. WDYT? ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; +return true; Review Comment: Only exponential-delay restart strategy make exception merging.
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1455102444 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -170,11 +176,15 @@ public RootExceptionHistoryEntry( CompletableFuture> failureLabels, @Nullable String failingTaskName, @Nullable TaskManagerLocation taskManagerLocation, -Iterable concurrentExceptions) { +Collection concurrentExceptions) { super(cause, timestamp, failureLabels, failingTaskName, taskManagerLocation); this.concurrentExceptions = concurrentExceptions; } +public void addConcurrentExceptions(Iterable concurrentlyExecutions) { Review Comment: The class was previously meant to be immutable. This method is changing that property. I'm wondering whether we should add `@NotThreadSafe` to the class to make that specific change more explicit. The change itself should be still ok because failure handling happens in the `JobMaster`'s main thread which is how we avoid concurrency issues. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; +return true; Review Comment: Aren't we also collecting exceptions here? Because the restart might happen with some delay. :thinking: ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java: ## @@ -150,7 +154,17 @@ public long getTimestamp() { * * @return The concurrently failed {@code Executions}. */ -public Iterable getConcurrentlyFailedExecution() { +public Set getConcurrentlyFailedExecution() { return Collections.unmodifiableSet(concurrentlyFailedExecutions); } + +/** + * @return Whether the current failure is a new attempt. True means that the current failure is Review Comment: ```suggestion * @return True means that the current failure is ``` nit: The first sentence does not add any more information. You could also go with the proposal I shared in `FailureHandlingResult` if you think it's a reasonable change. ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java: ## @@ -71,7 +74,10 @@ void setUp() { failoverStrategy = new TestFailoverStrategy(); testingFailureEnricher = new TestingFailureEnricher(); -backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS); +isNewAttempt = new AtomicBoolean(true); +backoffTimeStrategy = +new TestRestartBackoffTimeStrategy( +true, RESTART_DELAY_MS, () -> isNewAttempt.get()); Review Comment: ```suggestion true, RESTART_DELAY_MS, isNewAttempt::get); ``` nit ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java: ## @@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy { * Notify the strategy about the task failure cause. * * @param cause of the task failure + * @return Whether the current failure is a new attempt. True means that the current failure is + * a new attempt, false means that there has been a failure before and has not been tried + * yet, and the current failure will be merged into the previous attempt. Review Comment: ```suggestion * @return True means that the current failure is the first one after the most-recent failure * handling happened, false means that there has been a failure before that was not handled, * yet, and the current failure will be considered in a combined failure handling effort. ``` nit: just another proposal to remove the "attempt" from the description. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java: ## @@ -79,11 +79,12 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { if (isFailureTimestampsQueueFull()) { failureTimestamps.remove(); } failureTimestamps.add(clock.absoluteTimeMillis()); +return true; Review Comment: Same here, isn't that also collecting failures before doing the restart? So that we would have not a "first attempt" every single time? ##
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1895296744 Hi @XComp , sorry for bother you again due to the 1.19 will be freeze next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1448255898 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ## @@ -367,17 +367,13 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); -final FailureHandlingResultSnapshot failureHandlingResultSnapshot = -createFailureHandlingResultSnapshot(failureHandlingResult); +archiveFromFailureHandlingResult( +createFailureHandlingResultSnapshot(failureHandlingResult)); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( -() -> { -archiveFromFailureHandlingResult( Review Comment: Thanks for the clarification! The initial motivation might for collecting all concurrent exceptions. In this PR, the solution is save the `latestRootExceptionEntry` as a filed in `SchedulerBase`, when all subsequent non-root exceptions will be added to the `latestRootExceptionEntry`. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( failureLabels, failingTaskName, taskManagerLocation, -StreamSupport.stream(executions.spliterator(), false) -.filter(execution -> execution.getFailureInfo().isPresent()) -.map( -execution -> -ExceptionHistoryEntry.create( -execution, - execution.getVertexWithAttempt(), - FailureEnricherUtils.EMPTY_FAILURE_LABELS)) -.collect(Collectors.toList())); +createExceptionHistoryEntries(executions)); +} + +public static List createExceptionHistoryEntries( Review Comment: > we could move the logic into addConcurrentExceptions and call addConcurrentExceptions within createRootExceptionHistoryEntry on the newly created instance. It makes sense to me, thanks~ ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ## @@ -206,6 +214,11 @@ public boolean isGlobalFailure() { return globalFailure; } +/** @return Whether this failure is a new attempt. */ +public boolean isNewAttempt() { Review Comment: I added some comments to explain it. What do you think? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure( } public static RootExceptionHistoryEntry fromExceptionHistoryEntry( -ExceptionHistoryEntry entry, Iterable entries) { +ExceptionHistoryEntry entry, List entries) { Review Comment: Change it to `Collection`. We need to merge more exeptions into the `concurrentExceptions`, and `Iterable` doesn't support change. So changing it to `Collection`. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture> failureLabels, Iterable executions) { -exceptionHistory.add( +latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( -failure, timestamp, failureLabels, executions)); +failure, timestamp, failureLabels, executions); +exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { +// ALl exceptions as the ConcurrentExceptions when it's not a new attempt. +if (!failureHandlingResult.isNewAttempt()) { +checkState(latestRootExceptionEntry != null, "It should have old failure."); +List concurrentlyExecutions = new LinkedList<>(); + failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add); + concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution()); Review Comment: Thanks for the detailed reveiw! Good catch, I changed it to the `ArrayList`,
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1881105410 > Thanks for your efforts @1996fanrui. Overall, looks good. I can do a proper review after you've finalized the PR. I just added a few nitty comments. PTAL Many thanks @XComp for the review! If the solution is fine, I will address your comments and finish the tests as soon as possible! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1444703461 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java: ## @@ -150,7 +154,11 @@ public long getTimestamp() { * * @return The concurrently failed {@code Executions}. */ -public Iterable getConcurrentlyFailedExecution() { +public Set getConcurrentlyFailedExecution() { Review Comment: I'm not sure whether this change is necessary. See [my other related comment](https://github.com/apache/flink/pull/24003#discussion_r1444695608). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
XComp commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1444596677 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ## @@ -66,6 +66,9 @@ public class FailureHandlingResult { /** True if the original failure was a global failure. */ private final boolean globalFailure; +/** Tue if current failure is a new attempt. */ Review Comment: ```suggestion /** True if current failure is a new attempt. */ ``` nit ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java: ## @@ -85,7 +89,9 @@ private FailureHandlingResult( CompletableFuture> failureLabels, @Nullable Set verticesToRestart, long restartDelayMS, -boolean globalFailure) { +boolean globalFailure, +boolean isNewAttempt) { +this.isNewAttempt = isNewAttempt; Review Comment: nit: usually, contributors kind of stick to the order of the parameter list when setting the fields within the constructor. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ## @@ -367,17 +367,13 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe final CompletableFuture cancelFuture = cancelTasksAsync(verticesToRestart); -final FailureHandlingResultSnapshot failureHandlingResultSnapshot = -createFailureHandlingResultSnapshot(failureHandlingResult); +archiveFromFailureHandlingResult( +createFailureHandlingResultSnapshot(failureHandlingResult)); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( -() -> { -archiveFromFailureHandlingResult( Review Comment: > The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. I guess the motivation was to be able to collect all concurrent exceptions that happen before triggering the restart. But you're right - it doesn't make a difference because we're creating the `FailureHandlingResultSnapshot` already earlier (before scheduling the restart). I'm just wondering whether we should have created the snapshot in the scheduled task rather than before scheduling it to capture any concurrent exceptions that happened before the restart is triggered. :thinking: ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture> failureLabels, Iterable executions) { -exceptionHistory.add( +latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( -failure, timestamp, failureLabels, executions)); +failure, timestamp, failureLabels, executions); +exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { +// ALl exceptions as the ConcurrentExceptions when it's not a new attempt. Review Comment: ```suggestion // handle all subsequent exceptions as the concurrent exceptions when it's not a new attempt. ``` nit: the verb was missing. Additionally, `ConcurrentExceptions` indicates that it's some kind of class. We could use the language casing here in my opinion. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ## @@ -703,27 +707,41 @@ private void archiveGlobalFailure( long timestamp, CompletableFuture> failureLabels, Iterable executions) { -exceptionHistory.add( +latestRootExceptionEntry = RootExceptionHistoryEntry.fromGlobalFailure( -failure, timestamp, failureLabels, executions)); +failure, timestamp, failureLabels, executions); +exceptionHistory.add(latestRootExceptionEntry); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResultSnapshot failureHandlingResult) { +// ALl exceptions as the
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
flinkbot commented on PR #24003: URL: https://github.com/apache/flink/pull/24003#issuecomment-1870783715 ## CI report: * 09522e015163a7c578e6df70a983427233630305 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui opened a new pull request, #24003: URL: https://github.com/apache/flink/pull/24003 ## What is the purpose of the change The concurrent exceptions doesn't work. ## Brief change log - [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting - [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt - [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging ## Verifying this change Unit test is still developing. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no - The S3 file system connector:no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org