Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
XComp merged PR #24286: URL: https://github.com/apache/flink/pull/24286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
XComp commented on PR #24286: URL: https://github.com/apache/flink/pull/24286#issuecomment-1933821366 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
XComp commented on PR #24286: URL: https://github.com/apache/flink/pull/24286#issuecomment-1933821158 The [CI failure](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=57393) is due to another issue which is documented in FLINK-34411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
flinkbot commented on PR #24286: URL: https://github.com/apache/flink/pull/24286#issuecomment-1933267143 ## CI report: * 3106d453263661e7a08972882a2aabbdf3a7b20d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui opened a new pull request, #24286: URL: https://github.com/apache/flink/pull/24286 Backporting FLINK-34200 to 1.19 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui merged PR #24246: URL: https://github.com/apache/flink/pull/24246 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui commented on PR #24246: URL: https://github.com/apache/flink/pull/24246#issuecomment-1933257619 Thanks @XComp @StefanRRichter for the review, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
XComp commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1481446503 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -163,6 +163,8 @@ public void setup() throws Exception { NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel); config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); +// Disable the scaling cooldown to speed up the test +config.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(0)); Review Comment: The commit message needs to be changed to someting like `[hotfix][tests] Disables cool down phase for faster test execution` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1479164422 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } -/** Wait for on more completed checkpoint. */ -public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) -throws Exception { +/** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ +public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: Thanks for your code, this refactor makes sense to me. I have updated. ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR? Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT? ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -261,7 +263,13 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint. +// This test will fail if the job recovers from a checkpoint triggered before +// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling +// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects +// `ValueState counter` and `ValueState sum` after recovery from +// the checkpoint to be the count and sum of all data. Review Comment: Sounds make sense. I have updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
XComp commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1478141469 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: I created a [PR](https://github.com/1996fanrui/flink/pull/9) to show what I had in mind for the code redundancy reduction ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } -/** Wait for on more completed checkpoint. */ -public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) -throws Exception { +/** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ +public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: Good idea. Checking the trigger time is a better solution. I like that. :+1: ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } -/** Wait for on more completed checkpoint. */ -public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) -throws Exception { +/** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ +public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: About the redundant code: ```java /** Wait for (at least) the given number of successful checkpoints. */ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints) throws Exception { waitForCheckpoints( jobID, miniCluster, checkpointStatsSnapshot -> checkpointStatsSnapshot != null && checkpointStatsSnapshot .getCounts() .getNumberOfCompletedCheckpoints() >= numCheckpoints); } /** * Wait for a new completed checkpoint, the new checkpoint must be triggered after * waitForNewCheckpoint is called. */ public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { final long startTime = System.currentTimeMillis(); waitForCheckpoints( jobID, miniCluster, checkpointStatsSnapshot -> { if (checkpointStatsSnapshot != null) { final CompletedCheckpointStats latestCompletedCheckpoint = checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint(); return latestCompletedCheckpoint != null && latestCompletedCheckpoint.getTriggerTimestamp() > startTime; } return false; }); } private static void waitForCheckpoints( JobID jobId, MiniCluster miniCluster, Predicate condition) throws Exception { waitUntilCondition( () -> { final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get(); final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); if (condition.test(snapshot)) { return true; } else if (graph.getState().isGloballyTerminalState()) { checkState( graph.getFailureInfo() != null, "Job terminated (state=%s) before completing the requested checkpoint(s).", graph.getState()); throw graph.getFailureInfo().getException(); } return false;
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1475680928 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -328,7 +330,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: IIUC, we can call the newest `waitForNewCheckpoint` here. From the semantic, the old `waitForOneMoreCheckpoint` wait for one and more checkpoints. It check the checkpoint each 100ms by default. It means if the job generates 10 checkpoints within 100ms, `waitForOneMoreCheckpoint` will wait 10 checkpoints. So I don't think `waitForNewCheckpoint` breaks the semantic `waitForOneMoreCheckpoint`. And it's more clearer than before. Also, if we introduced the `waitForNewCheckpoint`, we might don't need `waitForOneMoreCheckpoint` that checking the checkpoint count. It's needed if we have a requirement that must wait for a number of new checkpoints. At least I didn't see the strong requirement for now. WDYT? ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Do you mean `createJobGraphWithKeyedState` and `createJobGraphWithKeyedAndNonPartitionedOperatorState` have redundant code ? Or `testCheckpointRescalingWithKeyedAndNonPartitionedState` and `testCheckpointRescalingKeyedState`? I checked them, they have a lot of differences in details. Such as: - Source is different - The parallelism and MaxParallelism is fixed parallelism for `NonPartitionedOperator` I will check could they extract some common code later. If yes, I can submit a hotfix PR and cc you. ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState( // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Same with this comment: https://github.com/apache/flink/pull/24246/files#r1474917357 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Added, please help double check, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1475645896 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } -/** Wait for on more completed checkpoint. */ -public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) -throws Exception { +/** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ +public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: > That way, we can pass in 2 in the test implementation analogously to waitForCheckpoint. I also feel like we can remove some redundant code within the two methods. 樂 IIUC, the semantic between `waitForCheckpoint` and `waitForOneMoreCheckpoint` are different. (`waitForOneMoreCheckpoint` is renamed to `waitForNewCheckpoint` in this PR.) - `waitForCheckpoint` check the total count of all completed checkpoints. - `waitForOneMoreCheckpoint` check the whether the new checkpoint is completed after it's called. - For example, the job has 10 completed checkpoint before it's called. - `waitForOneMoreCheckpoint` will wait for checkpoint-11 is completed. BTW, I have refactored the `waitForNewCheckpoint`. I check the checkpoint trigger time instead of checkpointCount. I think checking trigger time is clearer than checkpointCount >= 2. Other developers might don't know why check 2 checkpoint here, and `checkpointCount >= 2` doesn't work when enabling the concurrent checkpoint. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
1996fanrui commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1475645896 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } -/** Wait for on more completed checkpoint. */ -public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) -throws Exception { +/** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ +public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: I have refactored the `waitForNewCheckpoint`. I check the checkpoint trigger time instead of checkpointCount. I think checking trigger time is clearer than checkpointCount >= 2, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
XComp commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1474488421 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } -/** Wait for on more completed checkpoint. */ -public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) -throws Exception { +/** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ +public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: ```suggestion public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster, int checkpointCount) throws Exception { ``` Can't we make the number of checkpoints to wait for configurable? That way, we can pass in `2` in the test implementation analogously to `waitForCheckpoint`. I also feel like we can remove some redundant code within the two methods. :thinking: ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -328,7 +330,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: So far, we've only seen the issue in `#testCheckpointRescalingInKeyedState`. We don't need the two checkpoints here, actually, because we're not relying on elements in the test. We could keep the tests functionality if we make the `waitForOneMoreCheckpoint` configurable as suggested one of my previous comments. ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: We should add a comment here explaining why we need to wait for 2 instead of one checkpoint. ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState( // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: AFAIU, we don't need to change it here. ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: I guess the scenario can happen in this test as well because it's almost the same test implementation as in `#testCheckpointRescalingKeyedState` :thinking: ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); -waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); +waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: I'm wondering whether the redundant code could be removed here. But that's probably a bit out-of-scope for this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally [flink]
flinkbot commented on PR #24246: URL: https://github.com/apache/flink/pull/24246#issuecomment-1921304022 ## CI report: * 9eec91beb3f2ac2344454b9402ba2505201f0e49 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org