[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16984469#comment-16984469 ] Piotr Nowojski commented on FLINK-10377: As [reported by a user on the user mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/UNCHECKED-Error-while-confirming-Checkpoint-td23213.html], {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} can fail with the following exception: {noformat} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: checkpoint completed, but no transaction pending at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more {noformat} This can happen in the following scenario: # savepoint is triggered # checkpoint is triggered # checkpoint completes (but it doesn't subsume the savepoint, because checkpoints subsume only other checkpoints). # savepoint completes In this case, {{TwoPhaseCommitSinkFunction}} receives first notification that the later checkpoint completed, it commits both savepoint and the checkpoint. Later when savepoint {{notifyCheckpointComplete}} arrives, the above error will occur. Possible trivial fix is to remove that failing {{checkState}}. > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647633#comment-16647633 ] ASF GitHub Bot commented on FLINK-10377: StefanRRichter closed pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 2ffb6d5810e..0e7be9b2e46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { // Iterator>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); - checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending"); while (pendingTransactionIterator.hasNext()) { Map.Entry> entry = pendingTransactionIterator.next(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 2970b87789d..b97eb0c43e1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -138,6 +138,22 @@ private void closeTestHarness() throws Exception { harness.close(); } + @Test + public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception { + harness.open(); + harness.processElement("42", 0); + harness.snapshot(0, 1); + harness.processElement("43", 2); + harness.snapshot(1, 3); + harness.processElement("44", 4); + harness.snapshot(2, 5); + harness.notifyOfCompletedCheckpoint(2); + harness.notifyOfCompletedCheckpoint(1); + + assertExactlyOnce(Arrays.asList("42", "43", "44")); + assertEquals(1, tmpDirectory.listFiles().size()); // one for currentTransaction + } + @Test public void testNotifyOfCompletedCheckpoint() throws Exception { harness.open(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623461#comment-16623461 ] ASF GitHub Bot commented on FLINK-10377: pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723#discussion_r219471764 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ## @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { // Review comment: Maybe you can ask the reporter of the bug to provide debug or at least info logs? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622448#comment-16622448 ] ASF GitHub Bot commented on FLINK-10377: StefanRRichter commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723#discussion_r219264649 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ## @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { // Review comment: That is a very good point and IIRC, that is the case. Hmm, I see that there is always an element added in the snapshot method, so one should expect that there is always one in the notify part because no other method is ever removing from the map and method calls should also be mutually exclusive because they both hold the checkpointing lock. What else is left? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622442#comment-16622442 ] ASF GitHub Bot commented on FLINK-10377: StefanRRichter commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723#discussion_r219263435 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ## @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { // Review comment: That is a very good point and IIRC, that is the case. Hmm, I see that there is always an element added in the snapshot method, so one should expect that there is always one in the notify part because no other method is ever removing from the map and method calls should also be mutually exclusive because they both hold the checkpointing lock. What else is left? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622440#comment-16622440 ] ASF GitHub Bot commented on FLINK-10377: StefanRRichter commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723#discussion_r219263435 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ## @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { // Review comment: That is a very good point and IIRC, that is the case. Hmm, I see that there is always an element added in the snapshot method, so one should expect that there is always one in the notify part because no other method is ever removing from the map and method calls should also be mutually exclusive because they both hold the checkpointing lock. What else is left? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622362#comment-16622362 ] ASF GitHub Bot commented on FLINK-10377: pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723#discussion_r219247717 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ## @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { // Review comment: One thing aboved comment should be adapted: >// ==> There should never be a case where we have no pending transaction here Btw @StefanRRichter are you sure that subsuming can happen as in this test that you added? Basing on this large comment (@StephanEwen written it for pravega connector), subsuming of notification should happen on JobManager and such subsumed notification shouldn't reach the task. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622196#comment-16622196 ] ASF GitHub Bot commented on FLINK-10377: StefanRRichter opened a new pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723 …yCheckpointComplete ## What is the purpose of the change The precondition `checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");` in `TwoPhaseCommitSinkFunction.notifyCheckpointComplete()` seems to strict, because checkpoints can overtake checkpoints and will fail the precondition. In this case the commit was already performed by the first notification and subsumes the late checkpoint. I think the check can be removed. ## Brief change log Removed the precondition without replacements and created a test case for ou-of-order notification. ## Verifying this change `TwoPhaseCommitSinkFunctionTest#testSubsumedNotificationOfPreviousCheckpoint` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems to strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622197#comment-16622197 ] ASF GitHub Bot commented on FLINK-10377: StefanRRichter commented on issue #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif… URL: https://github.com/apache/flink/pull/6723#issuecomment-423223082 CC @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete > -- > > Key: FLINK-10377 > URL: https://issues.apache.org/jira/browse/FLINK-10377 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The precondition {{checkState(pendingTransactionIterator.hasNext(), > "checkpoint completed, but no transaction pending");}} in > {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems to strict, > because checkpoints can overtake checkpoints and will fail the precondition. > In this case the commit was already performed by the first notification and > subsumes the late checkpoint. I think the check can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)