Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-23 Thread via GitHub


1996fanrui merged PR #24003:
URL: https://github.com/apache/flink/pull/24003


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-23 Thread via GitHub


1996fanrui commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1905713145

   Thanks @XComp for the patient review and a series of great suggestions 
again! Merging~
   
   A good experience~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-23 Thread via GitHub


XComp commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1905623368

   ok, fine with me :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-22 Thread via GitHub


1996fanrui commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1905208562

   > Fair enough. What about squashing the 
[FLINK-33565](https://issues.apache.org/jira/browse/FLINK-33565)-related 
commits into a single one?
   
   Hi @XComp , it's fine for me. Let me explain why I split them into multiple 
commits:
   
   1. First of all, the doc of Flink code style introduces us that it's better 
to separate refactoring, cleanup and independent-changes as the separate 
commits. As I understand, it has a series of advantages:
 - Easy to review
 - Easy to revert
 - Easy to cherry-pick
   - In my daily work, we always cherry-pick some new features(our users 
needed) into our internal flink version, because a lot of flink jobs upgrade 
the flink version needs a lot of efforts.
   - During cherry-pick, I found let the ` independent-changes` as the 
separate commit is very useful.
   - For example, we need to cherry pick featureB, and featureA is merged 
before featureB. 
   - FeatureB was using some common changes of FeatureA, FeatureA added 
some new fields in some common classes.(It's not refactor, but the change isn't 
huge.)
   - If FeatureA think it's minor change and keep all changed into 1 
commit. We must cherry-pick featureA during cherry-pick featureB.
   - If featureA split them into multiple commits for each 
independent-changes, we can cherry-pick the common class changes of featureA, 
and the whole featureB.
   2. Currently, the first commit changed a minor behavior, and it's similar 
refactor. And it's mentioned in the last comment: 
https://github.com/apache/flink/pull/24003#issuecomment-1903758592
   3. The second commit, it's `[FLINK-33565][Exception] Restart strategy checks 
whether current failure is a new attempt`.
   - The concurrent exceptions depend on the result of 
`RestartBackoffTimeStrategy`.
   - Other features might depend on it as well in the future, so I prefer 
it's a independent-change
   4. The third commit is core change of this feature.
   5. The fourth commit is that fixing the `numberOfRestarts` metic, it may be 
a ndependent-change as well.
   
   That's why I split them into 4 commits. 
   
   As I said before, squashing them into one commit is fine for me. Looking 
forward to your feedback, thanks~
   
   [1] 
https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/#3-separate-refactoring-cleanup-and-independent-changes
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-22 Thread via GitHub


XComp commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1903887636

   Fair enough. What about squashing the FLINK-33565-related commits into a 
single one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-22 Thread via GitHub


1996fanrui commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1903758592

   > LGTM  Thanks for your efforts, @1996fanrui, and for keeping up with my 
nitty comments. I enjoyed the discussions in this PR.
   
   Thanks @XComp for the patient review, I definitely learned some skills from 
your comments. I enjoyed the discussions as well.
   
   > About the commits: We can reorganize them a bit, I feel. You could keep 
[6fe8037](https://github.com/apache/flink/commit/6fe8037b40d8458a5524adfb352d96925ced63bd)
 and 
[f422804](https://github.com/apache/flink/commit/f422804e918a8ea3537deeb1c4f4f1c9676e1a8a)
 as separate hotfix commits (using the `[hotfix]` prefix rather than the Jira 
issue) because they improve the code base and we would want to keep them even 
if we decide to revert 
[FLINK-33565](https://issues.apache.org/jira/browse/FLINK-33565) (for whatever 
reason) in the future. WDYT?
   
   The idea makes sense to me. 
   
   I update the commit message for the last commit, but  I didn't updated it 
for the first commit due to some reasons:
   
   1. The first commit changed the behavior
   - Before this PR, archiving exception when restarting task instead of 
immediately.
   - It means, when one task failure, we can see the exception history 
after flink restart this task.
   - The first commit archives exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting.
   2. I hope the first commit is fine, but I'm afraid we missed somethings , so 
it's not a simple refactor or hotfix.
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-21 Thread via GitHub


1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1461348956


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##
@@ -61,8 +61,9 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 currentRestartAttempt++;
+return true;

Review Comment:
   > I guess, you meant 
[FLIP-364](https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+exponential-delay+restart-strategy)
 here?
   
   Exactly, sorry for my typo.
   
   > What about deprecating that class as part of this PR and coming up with a 
follow-up Jira issue that replaces the strategy?
   
   They are internal class, IIUC, we can refactor it directly without the 
`@Deprecated` annotation.
   
   
-
   
   Your suggestion is that we still keep the failure-rate and fixed-delay 
restart-strategies, but we can reuse the 
`ExponentialDelayRestartBackoffTimeStrategy` class, right?
   
   I'm thinking could we deprecate failure-rate and fixed-delay 
restart-strategies directly? Users can configure them directly.
   
   Keep failure-rate and fixed-delay restart-strategies and reusing 
`ExponentialDelayRestartBackoffTimeStrategy` class still has the semantic 
problem that I mentioned in the last comment [1].
   
   [1] https://github.com/apache/flink/pull/24003#discussion_r1456807889



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-19 Thread via GitHub


XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1458870055


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
 assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
 }
 
+@Test
+void testNewAttemptAndNumberOfRestarts() throws Exception {
+final Set tasksToRestart =
+Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0));
+failoverStrategy.setTasksToRestart(tasksToRestart);
+
+Execution execution =
+
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+final Throwable error = new Exception("expected test failure");
+
+assertHandlerRootException(execution, error);
+
+isNewAttempt.set(false);
+assertHandlerConcurrentException(execution, error);
+assertHandlerConcurrentException(execution, error);
+
+isNewAttempt.set(true);
+assertHandlerRootException(execution, error);
+assertHandlerRootException(execution, error);
+
+isNewAttempt.set(false);
+assertHandlerConcurrentException(execution, error);
+assertHandlerConcurrentException(execution, error);
+}
+
+private void assertHandlerRootException(Execution execution, Throwable 
error) {
+final long originalNumberOfRestarts = 
executionFailureHandler.getNumberOfRestarts();
+FailureHandlingResult result =
+executionFailureHandler.getFailureHandlingResult(
+execution, error, System.currentTimeMillis());
+assertThat(result.isRootCause())
+.as(
+"The FailureHandlingResult shouldn be the root cause 
if exception is new attempt.")

Review Comment:
   ```suggestion
   "The FailureHandlingResult should be the root cause 
if exception is new attempt.")
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
 assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
 }
 
+@Test
+void testNewAttemptAndNumberOfRestarts() throws Exception {
+final Set tasksToRestart =
+Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0));
+failoverStrategy.setTasksToRestart(tasksToRestart);
+
+Execution execution =
+
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+final Throwable error = new Exception("expected test failure");
+
+assertHandlerRootException(execution, error);
+
+isNewAttempt.set(false);
+assertHandlerConcurrentException(execution, error);
+assertHandlerConcurrentException(execution, error);
+
+isNewAttempt.set(true);
+assertHandlerRootException(execution, error);
+assertHandlerRootException(execution, error);
+
+isNewAttempt.set(false);
+assertHandlerConcurrentException(execution, error);
+assertHandlerConcurrentException(execution, error);
+}
+
+private void assertHandlerRootException(Execution execution, Throwable 
error) {
+final long originalNumberOfRestarts = 
executionFailureHandler.getNumberOfRestarts();
+FailureHandlingResult result =
+executionFailureHandler.getFailureHandlingResult(
+execution, error, System.currentTimeMillis());
+assertThat(result.isRootCause())
+.as(
+"The FailureHandlingResult shouldn be the root cause 
if exception is new attempt.")
+.isTrue();
+assertThat(executionFailureHandler.getNumberOfRestarts())
+.as("The numberOfRestarts should be increased when it's a root 
exception.")
+.isEqualTo(originalNumberOfRestarts + 1);
+}
+
+private void assertHandlerConcurrentException(Execution execution, 
Throwable error) {

Review Comment:
   ```suggestion
   private void testHandlingConcurrentException(Execution execution, 
Throwable error) {
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
 assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
 }
 
+@Test
+void testNewAttemptAndNumberOfRestarts() throws Exception {
+final Set tasksToRestart =
+Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0));
+failoverStrategy.setTasksToRestart(tasksToRestart);
+
+Execution 

Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-18 Thread via GitHub


1996fanrui commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1899532108

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-17 Thread via GitHub


1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1456812210


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
 assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
 }
 
+/** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+@Test
+void testNewAttemptAndNumberOfRestarts() throws Exception {

Review Comment:
   Good suggestion!
   
   Also, I extracted the `assertHandlerRootException` and 
`assertHandlerConcurrentException` methods, after that, the 
`testNewAttemptAndNumberOfRestarts` is totally simple.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##
@@ -61,8 +61,9 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 currentRestartAttempt++;
+return true;

Review Comment:
   > But from a conceptual point of view: shouldn't we cover it also for the 
two other restart strategies. Or am I missing something here?
   
   In the beginning, I want to improve all restart strategies, but I meet some 
feedback to removing other restart strategies during discussion. The core 
background is this thread: 
https://lists.apache.org/thread/l7wyc7pndpsvh2h7hj3fw2td9yphrlox
   
   In brief, 3 reasons:
   
   1. The semantics of option
   
   - the failure-rate strategy's restart upper limit option is named  
`restart-strategy.failure-rate.max-failures-per-interval`
   - It's  `max-failures-per-interval` instead of 
`max-attempts-per-interval`.
   - If we improve it directly, the name and behaviour aren't matched.
   2. We recommend users use the `exponential-delay restart strategy` in the 
future, it's more powerful.
   3. After FLIP-360 discussion, I found `exponential-delay restart strategy` 
can replace other restart strategies directly if users set the 
`restart-strategy.exponential-delay.backoff-multiplier` = 1
   
   Actually, I think other restart-strategies can be deprecated in the future.



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
 final CompletableFuture> rootFailureLabels =
 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-final long concurrentExceptionTimestamp =
-triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+Predicate exception1Predicate =
+getExceptionHistoryEntryPredicate(
+concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   Thanks for the explanation!
   
   > testAddConecurrentExceptions will use all code of this test 
   
   If we have 2 tests, and test1 only call `testCommon`, and test2 call 
`testCommon` and `testPart2`. It means test2 can cover test1.
   
   2 solutions:
   - keep test1 and test2
   - Only keep test2
   
   In the current scenario, is it enough for us to only keep test2? Looking 
forward to your opinion, fine with me as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-17 Thread via GitHub


XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1455652997


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##
@@ -61,8 +61,9 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 currentRestartAttempt++;
+return true;

Review Comment:
   I understand that it wasn't covered by FLIP-364. But from a conceptual point 
of view: shouldn't we cover it also for the two other restart strategies. Or am 
I missing something here? :thinking:  I'm also find with having this being 
covered by follow-up Jira issues.



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
 assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
 }
 
+/** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+@Test
+void testNewAttemptAndNumberOfRestarts() throws Exception {

Review Comment:
   Like in my previous comment: We can avoid using comments in favor of the 
assert message :shrug: 



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
 final CompletableFuture> rootFailureLabels =
 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-final long concurrentExceptionTimestamp =
-triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+Predicate exception1Predicate =
+getExceptionHistoryEntryPredicate(
+concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   Fine with me. About reusing the code: You could move the code that's common 
between tests into a private static method. But other's might argue that this 
is also not good for code readability.
   
   Anyway, it's your choice in the end. Most of my comments are discussion 
points rather than comments where I claim that my proposal is the right one. :-)



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##
@@ -708,27 +712,43 @@ private void archiveGlobalFailure(
 long timestamp,
 CompletableFuture> failureLabels,
 Iterable executions) {
-exceptionHistory.add(
+latestRootExceptionEntry =
 RootExceptionHistoryEntry.fromGlobalFailure(
-failure, timestamp, failureLabels, executions));
+failure, timestamp, failureLabels, executions);
+exceptionHistory.add(latestRootExceptionEntry);
 log.debug("Archive global failure.", failure);
 }
 
 protected final void archiveFromFailureHandlingResult(
 FailureHandlingResultSnapshot failureHandlingResult) {
+// Handle all subsequent exceptions as the concurrent exceptions when 
it's not a new
+// attempt.
+if (!failureHandlingResult.isNewAttempt()) {
+checkState(
+latestRootExceptionEntry != null,
+"A root exception entry should exist if 
failureHandlingResult wasn't "
++ "generated as part of a new error handling 
cycle.");
+List concurrentlyExecutions = new ArrayList<>();
+
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());
+
+
latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions);
+return;
+}
+
 if (failureHandlingResult.getRootCauseExecution().isPresent()) {

Review Comment:
   We still might want to move the comment into the if block



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -171,6 +179,9 @@ void 

Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-17 Thread via GitHub


1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r145529


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
 assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
 }
 
+/** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+@Test
+void testNewAttemptAndNumberOfRestarts() throws Exception {

Review Comment:
   > The comment is obsolete because it could be expressed through the test 
method name(s) (to reduce redundant information in the code).
   
   Removed the comment.
   
   > Additionally, what's your opinion on splitting this big test up into 
smaller test methods. I see the benefit of having one big one. Because you can 
check the number of restarts at the same time. But having more specific test 
methods usually helps with improving readability of the test code.. WDYT?
   
   This test is the whole test, it wants to check NumberOfRestarts is excepted 
when we have a series of exceptions, and the exception merging logic work well.
   
   I added some comments inside of this test to try let it is easy to 
understand. You can check again.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
 return globalFailure;
 }
 
+/** @return Whether this failure is a new attempt. */
+public boolean isNewAttempt() {

Review Comment:
   Renaming it to `isRootCause` sounds make sense to me. Updated.
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java:
##
@@ -79,11 +79,12 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 if (isFailureTimestampsQueueFull()) {
 failureTimestamps.remove();
 }
 failureTimestamps.add(clock.absoluteTimeMillis());
+return true;

Review Comment:
   See the last comment.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -170,11 +176,15 @@ public RootExceptionHistoryEntry(
 CompletableFuture> failureLabels,
 @Nullable String failingTaskName,
 @Nullable TaskManagerLocation taskManagerLocation,
-Iterable concurrentExceptions) {
+Collection concurrentExceptions) {
 super(cause, timestamp, failureLabels, failingTaskName, 
taskManagerLocation);
 this.concurrentExceptions = concurrentExceptions;
 }
 
+public void addConcurrentExceptions(Iterable 
concurrentlyExecutions) {

Review Comment:
   Good catch, added the `@NotThreadSafe`.



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
 final CompletableFuture> rootFailureLabels =
 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-final long concurrentExceptionTimestamp =
-triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+Predicate exception1Predicate =
+getExceptionHistoryEntryPredicate(
+concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   My thoughts is moving the code down instead of create a new test method.
   
   Because `testAddConecurrentExceptions` will use all code of this test. WDYT?



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##
@@ -61,8 +61,9 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 currentRestartAttempt++;
+return true;

Review Comment:
   Only exponential-delay restart strategy make exception merging.
   
   
   

Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-17 Thread via GitHub


XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1455102444


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -170,11 +176,15 @@ public RootExceptionHistoryEntry(
 CompletableFuture> failureLabels,
 @Nullable String failingTaskName,
 @Nullable TaskManagerLocation taskManagerLocation,
-Iterable concurrentExceptions) {
+Collection concurrentExceptions) {
 super(cause, timestamp, failureLabels, failingTaskName, 
taskManagerLocation);
 this.concurrentExceptions = concurrentExceptions;
 }
 
+public void addConcurrentExceptions(Iterable 
concurrentlyExecutions) {

Review Comment:
   The class was previously meant to be immutable. This method is changing that 
property. I'm wondering whether we should add `@NotThreadSafe` to the class to 
make that specific change more explicit. 
   
   The change itself should be still ok because failure handling happens in the 
`JobMaster`'s main thread which is how we avoid concurrency issues.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##
@@ -61,8 +61,9 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 currentRestartAttempt++;
+return true;

Review Comment:
   Aren't we also collecting exceptions here? Because the restart might happen 
with some delay. :thinking: 



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##
@@ -150,7 +154,17 @@ public long getTimestamp() {
  *
  * @return The concurrently failed {@code Executions}.
  */
-public Iterable getConcurrentlyFailedExecution() {
+public Set getConcurrentlyFailedExecution() {
 return Collections.unmodifiableSet(concurrentlyFailedExecutions);
 }
+
+/**
+ * @return Whether the current failure is a new attempt. True means that 
the current failure is

Review Comment:
   ```suggestion
* @return True means that the current failure is
   ```
   nit: The first sentence does not add any more information. You could also go 
with the proposal I shared in `FailureHandlingResult` if you think it's a 
reasonable change.



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##
@@ -71,7 +74,10 @@ void setUp() {
 
 failoverStrategy = new TestFailoverStrategy();
 testingFailureEnricher = new TestingFailureEnricher();
-backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 
RESTART_DELAY_MS);
+isNewAttempt = new AtomicBoolean(true);
+backoffTimeStrategy =
+new TestRestartBackoffTimeStrategy(
+true, RESTART_DELAY_MS, () -> isNewAttempt.get());

Review Comment:
   ```suggestion
   true, RESTART_DELAY_MS, isNewAttempt::get);
   ```
   nit



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java:
##
@@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy {
  * Notify the strategy about the task failure cause.
  *
  * @param cause of the task failure
+ * @return Whether the current failure is a new attempt. True means that 
the current failure is
+ * a new attempt, false means that there has been a failure before and 
has not been tried
+ * yet, and the current failure will be merged into the previous 
attempt.

Review Comment:
   ```suggestion
* @return True means that the current failure is the first one after 
the most-recent failure
* handling happened, false means that there has been a failure 
before that was not handled,
* yet, and the current failure will be considered in a combined 
failure handling effort.
   ```
   nit: just another proposal to remove the "attempt" from the description.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java:
##
@@ -79,11 +79,12 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 if (isFailureTimestampsQueueFull()) {
 failureTimestamps.remove();
 }
 failureTimestamps.add(clock.absoluteTimeMillis());
+return true;

Review Comment:
   Same here, isn't that also collecting failures before doing the restart? So 
that we would have not a "first attempt" every single time?



##

Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-17 Thread via GitHub


1996fanrui commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1895296744

   Hi @XComp , sorry for bother you again due to the 1.19 will be freeze next 
week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-10 Thread via GitHub


1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1448255898


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##
@@ -367,17 +367,13 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 
 final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
-final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-createFailureHandlingResultSnapshot(failureHandlingResult);
+archiveFromFailureHandlingResult(
+createFailureHandlingResultSnapshot(failureHandlingResult));
 delayExecutor.schedule(
 () ->
 FutureUtils.assertNoException(
 cancelFuture.thenRunAsync(
-() -> {
-archiveFromFailureHandlingResult(

Review Comment:
   Thanks for the clarification!
   
   The initial motivation might for collecting all concurrent exceptions. In 
this PR, the solution is save the `latestRootExceptionEntry` as a filed in 
`SchedulerBase`, when all subsequent non-root exceptions will be added to the 
`latestRootExceptionEntry`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry 
createRootExceptionHistoryEntry(
 failureLabels,
 failingTaskName,
 taskManagerLocation,
-StreamSupport.stream(executions.spliterator(), false)
-.filter(execution -> 
execution.getFailureInfo().isPresent())
-.map(
-execution ->
-ExceptionHistoryEntry.create(
-execution,
-
execution.getVertexWithAttempt(),
-
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
-.collect(Collectors.toList()));
+createExceptionHistoryEntries(executions));
+}
+
+public static List createExceptionHistoryEntries(

Review Comment:
   > we could move the logic into addConcurrentExceptions and call 
addConcurrentExceptions within createRootExceptionHistoryEntry on the newly 
created instance. 
   
   It makes sense to me, thanks~



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
 return globalFailure;
 }
 
+/** @return Whether this failure is a new attempt. */
+public boolean isNewAttempt() {

Review Comment:
   I added some comments to explain it. What do you think?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure(
 }
 
 public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
-ExceptionHistoryEntry entry, Iterable 
entries) {
+ExceptionHistoryEntry entry, List entries) {

Review Comment:
   Change it to `Collection`.
   
   We need to merge more exeptions into the `concurrentExceptions`, and 
`Iterable` doesn't support change. So changing it to `Collection`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
 long timestamp,
 CompletableFuture> failureLabels,
 Iterable executions) {
-exceptionHistory.add(
+latestRootExceptionEntry =
 RootExceptionHistoryEntry.fromGlobalFailure(
-failure, timestamp, failureLabels, executions));
+failure, timestamp, failureLabels, executions);
+exceptionHistory.add(latestRootExceptionEntry);
 log.debug("Archive global failure.", failure);
 }
 
 protected final void archiveFromFailureHandlingResult(
 FailureHandlingResultSnapshot failureHandlingResult) {
+// ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.
+if (!failureHandlingResult.isNewAttempt()) {
+checkState(latestRootExceptionEntry != null, "It should have old 
failure.");
+List concurrentlyExecutions = new LinkedList<>();
+
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());

Review Comment:
   Thanks for the detailed reveiw! 
   
   Good catch, I changed it to the `ArrayList`, 

Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-08 Thread via GitHub


1996fanrui commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1881105410

   > Thanks for your efforts @1996fanrui. Overall, looks good. I can do a 
proper review after you've finalized the PR. I just added a few nitty comments. 
PTAL
   
   Many thanks @XComp for the review! If the solution is fine, I will address 
your comments and finish the tests as soon as possible!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-08 Thread via GitHub


XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1444703461


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##
@@ -150,7 +154,11 @@ public long getTimestamp() {
  *
  * @return The concurrently failed {@code Executions}.
  */
-public Iterable getConcurrentlyFailedExecution() {
+public Set getConcurrentlyFailedExecution() {

Review Comment:
   I'm not sure whether this change is necessary. See [my other related 
comment](https://github.com/apache/flink/pull/24003#discussion_r1444695608).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-08 Thread via GitHub


XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1444596677


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##
@@ -66,6 +66,9 @@ public class FailureHandlingResult {
 /** True if the original failure was a global failure. */
 private final boolean globalFailure;
 
+/** Tue if current failure is a new attempt. */

Review Comment:
   ```suggestion
   /** True if current failure is a new attempt. */
   ```
   nit



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##
@@ -85,7 +89,9 @@ private FailureHandlingResult(
 CompletableFuture> failureLabels,
 @Nullable Set verticesToRestart,
 long restartDelayMS,
-boolean globalFailure) {
+boolean globalFailure,
+boolean isNewAttempt) {
+this.isNewAttempt = isNewAttempt;

Review Comment:
   nit: usually, contributors kind of stick to the order of the parameter list 
when setting the fields within the constructor.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##
@@ -367,17 +367,13 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 
 final CompletableFuture cancelFuture = 
cancelTasksAsync(verticesToRestart);
 
-final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-createFailureHandlingResultSnapshot(failureHandlingResult);
+archiveFromFailureHandlingResult(
+createFailureHandlingResultSnapshot(failureHandlingResult));
 delayExecutor.schedule(
 () ->
 FutureUtils.assertNoException(
 cancelFuture.thenRunAsync(
-() -> {
-archiveFromFailureHandlingResult(

Review Comment:
   > The first commit is refactoring, actually, I don't know why archiving 
exception when restarting task instead of immediately. It means, when one task 
failure, we can see the exception history after flink restart this task. So the 
first commit is only a refactoring. It archives exceptions into the exception 
history immediately when they occur, instead of archiving them when restarting.
   
   I guess the motivation was to be able to collect all concurrent exceptions 
that happen before triggering the restart. But you're right - it doesn't make a 
difference because we're creating the `FailureHandlingResultSnapshot` already 
earlier (before scheduling the restart). I'm just wondering whether we should 
have created the snapshot in the scheduled task rather than before scheduling 
it to capture any concurrent exceptions that happened before the restart is 
triggered. :thinking: 



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
 long timestamp,
 CompletableFuture> failureLabels,
 Iterable executions) {
-exceptionHistory.add(
+latestRootExceptionEntry =
 RootExceptionHistoryEntry.fromGlobalFailure(
-failure, timestamp, failureLabels, executions));
+failure, timestamp, failureLabels, executions);
+exceptionHistory.add(latestRootExceptionEntry);
 log.debug("Archive global failure.", failure);
 }
 
 protected final void archiveFromFailureHandlingResult(
 FailureHandlingResultSnapshot failureHandlingResult) {
+// ALl exceptions as the ConcurrentExceptions when it's not a new 
attempt.

Review Comment:
   ```suggestion
   // handle all subsequent exceptions as the concurrent exceptions 
when it's not a new attempt.
   ```
   nit: the verb was missing. Additionally, `ConcurrentExceptions` indicates 
that it's some kind of class. We could use the language casing here in my 
opinion.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
 long timestamp,
 CompletableFuture> failureLabels,
 Iterable executions) {
-exceptionHistory.add(
+latestRootExceptionEntry =
 RootExceptionHistoryEntry.fromGlobalFailure(
-failure, timestamp, failureLabels, executions));
+failure, timestamp, failureLabels, executions);
+exceptionHistory.add(latestRootExceptionEntry);
 log.debug("Archive global failure.", failure);
 }
 
 protected final void archiveFromFailureHandlingResult(
 FailureHandlingResultSnapshot failureHandlingResult) {
+// ALl exceptions as the 

Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2023-12-27 Thread via GitHub


flinkbot commented on PR #24003:
URL: https://github.com/apache/flink/pull/24003#issuecomment-1870783715

   
   ## CI report:
   
   * 09522e015163a7c578e6df70a983427233630305 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2023-12-27 Thread via GitHub


1996fanrui opened a new pull request, #24003:
URL: https://github.com/apache/flink/pull/24003

   ## What is the purpose of the change
   
   The concurrent exceptions doesn't work.
   
   
   ## Brief change log
   
   - [FLINK-33565][Exception] Archive exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting
   - [FLINK-33565][Exception] Restart strategy checks whether current failure 
is a new attempt
   - [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging
   
   
   ## Verifying this change
   
   Unit test is still developing.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no
 - The S3 file system connector:no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org