[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint
[ https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638247#comment-17638247 ] Maximilian Michels commented on FLINK-29856: I think this might be caused by FLINK-25191. It was a conscious design choice as also explained in [FLIP-193|https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership#FLIP193:Snapshotsownership-SkippingSavepointsforRecovery] to skip side effects like notifyCheckpointComplete() for intermediate savepoints. The reason is that savepoints can be drawn at any point in time by the user who also controls the lifespan of the savepoint data. During recovery, savepoints were used just like checkpoints which could lead to issues when the savepoint had already been deleted or an option was used to skip savepoints during recovery. In this case, the recovery would use an older checkpoint and potentially cause side effects more than once, i.e. calling notifyCheckpointComplete() again for already processed records/state. The new behavior is to treat "intermediate" savepoints independently of checkpoints which means they won't be used by normal recovery and hence should not be allowed to cause side effects. Intermediate savepoints can still be used to restore a job but notifyCheckpointComplete() will only be called upon creating the first checkpoint. Note that "final" savepoints which lead to termination of the job are still allowed to cause side effects via notifyCheckpointComplete(). > Triggering savepoint does not trigger source operator checkpoint > - > > Key: FLINK-29856 > URL: https://issues.apache.org/jira/browse/FLINK-29856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Mason Chen >Priority: Major > > When I trigger a savepoint with the Flink K8s operator, I verified for two > sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState > or notifyCheckpointComplete. This is easily reproducible in a simple pipeline > (e.g. KafkaSource -> print). In this case, the savepoint is complete and > successful, which is verified by the Flink Checkpoint UI tab and the > jobmanager logs. e.g. ` > Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', > postCheckpointAction=NONE, formatType=CANONICAL})` > > However, when the checkpoint occurs via the interval, I do see the sources > checkpointing properly and expected logs in the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint
[ https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638190#comment-17638190 ] Yun Gao commented on FLINK-29856: - Hi [~mason6345] may I have a double confirmation that how you verified that snapshotState/notifyCheckpointComplete is not called? > Triggering savepoint does not trigger source operator checkpoint > - > > Key: FLINK-29856 > URL: https://issues.apache.org/jira/browse/FLINK-29856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Mason Chen >Priority: Major > > When I trigger a savepoint with the Flink K8s operator, I verified for two > sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState > or notifyCheckpointComplete. This is easily reproducible in a simple pipeline > (e.g. KafkaSource -> print). In this case, the savepoint is complete and > successful, which is verified by the Flink Checkpoint UI tab and the > jobmanager logs. e.g. ` > Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', > postCheckpointAction=NONE, formatType=CANONICAL})` > > However, when the checkpoint occurs via the interval, I do see the sources > checkpointing properly and expected logs in the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint
[ https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635693#comment-17635693 ] Mason Chen commented on FLINK-29856: Yes, Savepoint is completed successfully from Flink UI, metrics, and jobmanager logs but should be unsuccessful since the operators did not finish checkpointing. Note I didn't see any operator failures during the Savepoint process. It's not only with the source–I also confirmed that the stateful sink operator also doesn't call snapshotState/notifyCheckpointComplete. I haven't checked the Savepoint contents and I didn't notice the affects of corruption (e.g. missing source splits in state) since another checkpoint finished before I shutdown the job. I will test again tomorrow. > Triggering savepoint does not trigger source operator checkpoint > - > > Key: FLINK-29856 > URL: https://issues.apache.org/jira/browse/FLINK-29856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Mason Chen >Priority: Major > > When I trigger a savepoint with the Flink K8s operator, I verified for two > sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState > or notifyCheckpointComplete. This is easily reproducible in a simple pipeline > (e.g. KafkaSource -> print). In this case, the savepoint is complete and > successful, which is verified by the Flink Checkpoint UI tab and the > jobmanager logs. e.g. ` > Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', > postCheckpointAction=NONE, formatType=CANONICAL})` > > However, when the checkpoint occurs via the interval, I do see the sources > checkpointing properly and expected logs in the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint
[ https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634817#comment-17634817 ] Maximilian Michels commented on FLINK-29856: I think the symptom described here manifests as a successful savepoint which does not include the mentioned sources because they have not been checkpointed as part of the savepoint. [~mason6345] I'm assuming this would mean that the resulting savepoint is corrupted? > Triggering savepoint does not trigger source operator checkpoint > - > > Key: FLINK-29856 > URL: https://issues.apache.org/jira/browse/FLINK-29856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Mason Chen >Priority: Major > > When I trigger a savepoint with the Flink K8s operator, I verified for two > sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState > or notifyCheckpointComplete. This is easily reproducible in a simple pipeline > (e.g. KafkaSource -> print). In this case, the savepoint is complete and > successful, which is verified by the Flink Checkpoint UI tab and the > jobmanager logs. e.g. ` > Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', > postCheckpointAction=NONE, formatType=CANONICAL})` > > However, when the checkpoint occurs via the interval, I do see the sources > checkpointing properly and expected logs in the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint
[ https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634602#comment-17634602 ] Hangxiang Yu commented on FLINK-29856: -- Hi, I may miss something. You mean that the savepoint is completed successfully, but it should not be successful according to your analysis about the code ? > Triggering savepoint does not trigger source operator checkpoint > - > > Key: FLINK-29856 > URL: https://issues.apache.org/jira/browse/FLINK-29856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.16.0 >Reporter: Mason Chen >Priority: Major > > When I trigger a savepoint with the Flink K8s operator, I verified for two > sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState > or notifyCheckpointComplete. This is easily reproducible in a simple pipeline > (e.g. KafkaSource -> print). In this case, the savepoint is complete and > successful, which is verified by the Flink Checkpoint UI tab and the > jobmanager logs. e.g. ` > Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', > postCheckpointAction=NONE, formatType=CANONICAL})` > > However, when the checkpoint occurs via the interval, I do see the sources > checkpointing properly and expected logs in the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)