[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint (trigger checkpoints via REST API)

2022-09-20 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27101:
---
Component/s: Runtime / REST

> Periodically break the chain of incremental checkpoint (trigger checkpoints 
> via REST API)
> -
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Steven Zhen Wu
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint (trigger checkpoints via REST API)

2022-09-20 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27101:
---
Summary: Periodically break the chain of incremental checkpoint (trigger 
checkpoints via REST API)  (was: Periodically break the chain of incremental 
checkpoint)

> Periodically break the chain of incremental checkpoint (trigger checkpoints 
> via REST API)
> -
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Steven Zhen Wu
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint

2022-09-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27101:
---
Labels: pull-request-available  (was: )

> Periodically break the chain of incremental checkpoint
> --
>
> Key: FLINK-27101
> URL: https://issues.apache.org/jira/browse/FLINK-27101
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Steven Zhen Wu
>Assignee: Jiale Tan
>Priority: Major
>  Labels: pull-request-available
>
> Incremental checkpoint is almost a must for large-state jobs. It greatly 
> reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
> implications from incremental checkpoint that are problematic for production 
> operations.  Will use S3 as an example DFS for the rest of description.
> 1. Because there is no way to deterministically know how far back the 
> incremental checkpoint can refer to files uploaded to S3, it is very 
> difficult to set S3 bucket/object TTL. In one application, we have observed 
> Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can 
> corrupt the Flink checkpoints.
> S3 TTL is important for a few reasons
> - purge orphaned files (like external checkpoints from previous deployments) 
> to keep the storage cost in check. This problem can be addressed by 
> implementing proper garbage collection (similar to JVM) by traversing the 
> retained checkpoints from all jobs and traverse the file references. But that 
> is an expensive solution from engineering cost perspective.
> - Security and privacy. E.g., there may be requirement that Flink state can't 
> keep the data for more than some duration threshold (hours/days/weeks). 
> Application is expected to purge keys to satisfy the requirement. However, 
> with incremental checkpoint and how deletion works in RocksDB, it is hard to 
> set S3 TTL to purge S3 files. Even though those old S3 files don't contain 
> live keys, they may still be referrenced by retained Flink checkpoints.
> 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a 
> result, restoring from checkpoint failed. With incremental checkpoint, it 
> usually doesn't help to try other older checkpoints, because they may refer 
> to the same corrupted file. It is unclear whether the corruption happened 
> before or during S3 upload. This risk can be mitigated with periodical 
> savepoints.
> It all boils down to periodical full snapshot (checkpoint or savepoint) to 
> deterministically break the chain of incremental checkpoints. Search the jira 
> history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
> what we would need here.
> There are a few options
> 1. Periodically trigger savepoints (via control plane). This is actually not 
> a bad practice and might be appealing to some people. The problem is that it 
> requires a job deployment to break the chain of incremental checkpoint. 
> periodical job deployment may sound hacky. If we make the behavior of full 
> checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
> an acceptable compromise. The benefit is that no job deployment is required 
> after savepoints.
> 2. Build the feature in Flink incremental checkpoint. Periodically (with some 
> cron style config) trigger a full checkpoint to break the incremental chain. 
> If the full checkpoint failed (due to whatever reason), the following 
> checkpoints should attempt full checkpoint as well until one successful full 
> checkpoint is completed.
> 3. For the security/privacy requirement, the main thing is to apply 
> compaction on the deleted keys. That could probably avoid references to the 
> old files. Is there any RocksDB compation can achieve full compaction of 
> removing old delete markers. Recent delete markers are fine
> [1] https://issues.apache.org/jira/browse/FLINK-23949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint

2022-04-06 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-27101:
---
Description: 
Incremental checkpoint is almost a must for large-state jobs. It greatly 
reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
implications from incremental checkpoint that are problematic for production 
operations.  Will use S3 as an example DFS for the rest of description.

1. Because there is no way to deterministically know how far back the 
incremental checkpoint can refer to files uploaded to S3, it is very difficult 
to set S3 bucket/object TTL. In one application, we have observed Flink 
checkpoint referring to files uploaded over 6 months ago. S3 TTL can corrupt 
the Flink checkpoints.

S3 TTL is important for a few reasons
- purge orphaned files (like external checkpoints from previous deployments) to 
keep the storage cost in check. This problem can be addressed by implementing 
proper garbage collection (similar to JVM) by traversing the retained 
checkpoints from all jobs and traverse the file references. But that is an 
expensive solution from engineering cost perspective.
- Security and privacy. E.g., there may be requirement that Flink state can't 
keep the data for more than some duration threshold (hours/days/weeks). 
Application is expected to purge keys to satisfy the requirement. However, with 
incremental checkpoint and how deletion works in RocksDB, it is hard to set S3 
TTL to purge S3 files. Even though those old S3 files don't contain live keys, 
they may still be referrenced by retained Flink checkpoints.

2. Occasionally, corrupted checkpoint files (on S3) are observed. As a result, 
restoring from checkpoint failed. With incremental checkpoint, it usually 
doesn't help to try other older checkpoints, because they may refer to the same 
corrupted file. It is unclear whether the corruption happened before or during 
S3 upload. This risk can be mitigated with periodical savepoints.

It all boils down to periodical full snapshot (checkpoint or savepoint) to 
deterministically break the chain of incremental checkpoints. Search the jira 
history, the behavior that FLINK-23949 [1] trying to fix is actually close to 
what we would need here.

There are a few options

1. Periodically trigger savepoints (via control plane). This is actually not a 
bad practice and might be appealing to some people. The problem is that it 
requires a job deployment to break the chain of incremental checkpoint. 
periodical job deployment may sound hacky. If we make the behavior of full 
checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be 
an acceptable compromise. The benefit is that no job deployment is required 
after savepoints.

2. Build the feature in Flink incremental checkpoint. Periodically (with some 
cron style config) trigger a full checkpoint to break the incremental chain. If 
the full checkpoint failed (due to whatever reason), the following checkpoints 
should attempt full checkpoint as well until one successful full checkpoint is 
completed.

3. For the security/privacy requirement, the main thing is to apply compaction 
on the deleted keys. That could probably avoid references to the old files. Is 
there any RocksDB compation can achieve full compaction of removing old delete 
markers. Recent delete markers are fine


[1] https://issues.apache.org/jira/browse/FLINK-23949

  was:
Incremental checkpoint is almost a must for large-state jobs. It greatly 
reduces the bytes uploaded to DFS per checkpoint. However, there are  a few 
implications from incremental checkpoint that are problematic for production 
operations.  Will use S3 as an example DFS in the rest of description.

1. Because there is no way to deterministically know how far back the 
incremental checkpoint can refer to files uploaded to S3, it is very difficult 
to set S3 bucket/object TTL. In one application, we have observed Flink 
checkpoint referring to files uploaded over 6 months ago. S3 TTL can corrupt 
the Flink checkpoints.

S3 TTL is important for a few reasons
- purge orphaned files (like external checkpoints from previous deployments) to 
keep the storage cost in check. This problem can be addressed by implementing 
proper garbage collection (similar to JVM) by traversing the retained 
checkpoints from all jobs and traverse the file references. But that is an 
expensive solution from engineering cost perspective.
- Security and privacy. E.g., there may be requirement that Flink state can't 
keep the data for more than some duration threshold (hours/days/weeks). 
Application is expected to purge keys to satisfy the requirement. However, with 
incremental checkpoint and how deletion works in RocksDB, it is hard to set S3 
TTL to purge S3 files. Even though those old S3 files don't contain live keys, 
they may still be referrenced by retained Flink