[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-24 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638247#comment-17638247
 ] 

Maximilian Michels commented on FLINK-29856:


I think this might be caused by FLINK-25191. It was a conscious design choice 
as also explained in 
[FLIP-193|https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership#FLIP193:Snapshotsownership-SkippingSavepointsforRecovery]
 to skip side effects like notifyCheckpointComplete() for intermediate 
savepoints.

The reason is that savepoints can be drawn at any point in time by the user who 
also controls the lifespan of the savepoint data. During recovery, savepoints 
were used just like checkpoints which could lead to issues when the savepoint 
had already been deleted or an option was used to skip savepoints during 
recovery. In this case, the recovery would use an older checkpoint and 
potentially cause side effects more than once, i.e. calling 
notifyCheckpointComplete() again for already processed records/state.

The new behavior is to treat "intermediate" savepoints independently of 
checkpoints which means they won't be used by normal recovery and hence should 
not be allowed to cause side effects. Intermediate savepoints can still be used 
to restore a job but notifyCheckpointComplete() will only be called upon 
creating the first checkpoint. Note that "final" savepoints which lead to 
termination of the job are still allowed to cause side effects via 
notifyCheckpointComplete().

> Triggering savepoint does not trigger source operator checkpoint 
> -
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Mason Chen
>Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638190#comment-17638190
 ] 

Yun Gao commented on FLINK-29856:
-

Hi [~mason6345]  may I have a double confirmation that how you verified that 
snapshotState/notifyCheckpointComplete is not called?

> Triggering savepoint does not trigger source operator checkpoint 
> -
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Mason Chen
>Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-17 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635693#comment-17635693
 ] 

Mason Chen commented on FLINK-29856:


Yes, Savepoint is completed successfully from Flink UI, metrics, and jobmanager 
logs but should be unsuccessful since the operators did not finish 
checkpointing. Note I didn't see any operator failures during the Savepoint 
process.

It's not only with the source–I also confirmed that the stateful sink operator 
also doesn't call snapshotState/notifyCheckpointComplete.

I haven't checked the Savepoint contents and I didn't notice the affects of 
corruption (e.g. missing source splits in state) since another checkpoint 
finished before I shutdown the job. I will test again tomorrow.

 

> Triggering savepoint does not trigger source operator checkpoint 
> -
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Mason Chen
>Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-16 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634817#comment-17634817
 ] 

Maximilian Michels commented on FLINK-29856:


I think the symptom described here manifests as a successful savepoint which 
does not include the mentioned sources because they have not been checkpointed 
as part of the savepoint. [~mason6345] I'm assuming this would mean that the 
resulting savepoint is corrupted?

> Triggering savepoint does not trigger source operator checkpoint 
> -
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Mason Chen
>Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-15 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634602#comment-17634602
 ] 

Hangxiang Yu commented on FLINK-29856:
--

Hi, I may miss something.

You mean that the savepoint is completed successfully, but it should not be 
successful according to your analysis about the code ?

> Triggering savepoint does not trigger source operator checkpoint 
> -
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Mason Chen
>Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)