[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796843#comment-17796843
 ] 

Piotr Nowojski commented on FLINK-27681:


I could help reviewing and generally speaking support this effort, including 
discussions around the FLIP (this change would have to modify `FileSystem` API 
- DFS has to support in someway checksum verification). The general plan should 
be:
 # Someone would have to investigate what's possible without modification of 
RocksDB, and works at least with S3. Some tests against S3 and raw benchmarks 
without using Flink would be needed. For example a simple standalone Java app, 
uploading the a while verifying the checksum at the same time. Definitely 
people will be asking what about other DFSs. It would be nice to do a research 
with Azure/GCP also (might be just looking into the docs without testing).
 # Create a FLIP. Ideally already with this zero-overhead solution. But if 
RocksDB would be problematic, something that's already designed with the 
zero-overhead in ming for the long term, and some intermediate temporary 
solution would be probably also fine.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-13 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796527#comment-17796527
 ] 

Yue Ma commented on FLINK-27681:


[~pnowojski] Thanks for your suggestion, I think this is a perfect solution. 
But it sounds like there is still a long way to go to implement this plan. Do 
we have any specific plans to do this?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-08 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794622#comment-17794622
 ] 

Piotr Nowojski commented on FLINK-27681:


{quote}

So I understand that we can do this manual check in this ticket first. If the 
file is detected to be corrupted, we can fail the job. Is this a good choice? 

{quote}

I would prefer to avoid the approach that was proposed here in the ticket:

[i] before uploading the file, scan it completely block by block for the data 
consistency using RocksDB's mechanisms. 

The problems:
 * Performance overhead
 * Works only for RocksDB
 * Doesn't protect us fully from the data corruption. Corruption can happen 
just after we checked it locally, but before we uploaded to the DFS

So rephrasing what I was proposing in my messages above:

[ii] 
 # Calculate some checksum *[A]* ON THE FLY, at the same time that the state 
file is being written/created. For RocksDB that would require hooking up with 
the RocksDB itself. It would be easier for the HashMap state backend. But it 
would have zero additional IO cost, and some minor CPU cost (negligible 
compared to the IO access)
 # Remember the checksum *[A]*  until:
 # Depending what the DFS supports, either:
 ** preferably, verify against the checksum *[A]* ON THE FLY, when file is 
being uploaded to the DFS. In principle, if implemented by the DFS properly, 
this should be again basically for free, without and additional IO cost. S3 
might actually support that via  [1] or [2].
 ** after uploading, use DFS api to remotely calculate checksum of the uploaded 
file, and compare it against the checksum *[A].* S3 does support it [3], 
quoting:

{quote}

After uploading objects, you can get the checksum value and compare it to a 
precalculated or previously stored checksum value calculated using the same 
algorithm.

{quote}

 

Note, we would have to ensure, that checksum  [A] is always calculated the same 
way, both in the statebackend (RocksDB) and DFS (S3). I have no idea if RocksDB 
supports something like that, but if not:
 * it should be a welcome contribution by RocksDB maintainers
 * implementing a hook on our side in our FRocksDB fork doesn't sound too 
complicated. I would hope it would only require wrapping some {{OutByteStream}} 
class and that's it.

 

[1] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5]

[2] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums]
 

[3] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#using-additional-checksums]

 

 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-07 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794505#comment-17794505
 ] 

Yue Ma commented on FLINK-27681:


{quote}And the corrupted file maybe just uploaded to remote storage without any 
check like reading block checksum when checkpoint if we don't check it manually.
{quote}
So I understand that we can do this manual check in this ticket first. If the 
file is detected to be corrupted, we can fail the job. Is this a good choice? 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-07 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794272#comment-17794272
 ] 

Piotr Nowojski commented on FLINK-27681:


[~masteryhx] , yes that would require a new FLIP, as one way or another we 
would have to verify checksums of the uploaded files to the DFS. Also please 
note my hints/ideas that we might not need any dedicated checksum counting, as 
it should be possible to hook in and get/calculate checksums on the fly during 
file creation/file upload.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-07 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794190#comment-17794190
 ] 

Hangxiang Yu commented on FLINK-27681:
--

{quote}Yes, I think we don't need any extra protection for corruption of the 
local files. From the document you shared RocksDB will throw some error every 
time we try to read a corrupted block
{quote}
Yes, reading a corrupted block must be checked which is safe.

But the write operation (e.g. flush, compaction) may introduce a new corrupted 
file which may not be checked.

And the corrupted file maybe just uploaded to remote storage without any check 
like reading block checksum when checkpoint if we don't check it manually.
{quote}Now I'm not so sure about it. Now that I think about it more, checksums 
on the filesystem level or the HDD/SSD level wouldn't protect us from a 
corruption happening after reading the bytes from local file, but before those 
bytes are acknowledged by the DFS/object store. 
{quote}
Yes, you're right. That's what I mentioned before about the end-to-end checksum 
(verify the file correctness from local to remote by unified checksum). And 
Thanks for sharing detailed infos about S3.

"But this may introduce a new API in some public classes like FileSystem which 
is a bigger topic." , maybe need a FLIP ?

We also have tried to add this end-to-end checksum in our internal Flink 
version which is doable for many file systems.

We could also contribute this after we have verified the benefits and 
performance cost if worthy doing.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793626#comment-17793626
 ] 

Piotr Nowojski commented on FLINK-27681:


{quote}

IIUC, checksum in SST level could guarantee the correctness of local file.

{quote}

Yes, I think we don't need any extra protection for corruption of the local 
files. From the document you shared RocksDB will throw some error every time we 
try to read a corrupted block

{quote}

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

{quote}

Now I'm not so sure about it. Now that I think about it more, checksums on the 
filesystem level or the HDD/SSD level wouldn't protect us from a corruption 
happening after reading the bytes from local file, but before those bytes are 
acknowledged by the DFS/object store. 

A neat way would be to calculate the checksum locally, when writing the SST 
file to the local file system ("Full File Checksum Design" from the document 
[~masteryhx]  shared?), without any significant overhead (bytes that we want to 
verify would be after all already in the RAM). Next if we could cheaply verify 
that the uploaded file to the DFS still has the same checksum as computed 
during creation of that file, we could make sure that no matter what, we always 
have valid files in the DFS, that we can fallback to everytime RocksDB detects 
a data corruption when accessing and SST file locally.

It looks like this might be do-able in one [1] of the two [2] ways. At least 
for the S3. 

[1] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5]

I don't know if AWS's check against the {{Content-MD5}} field is for free. As 
far as I understand it, it could be implement to be almost for free, but the 
docs do not mention that.

[2] 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums
 

Here the docs are saying that this is for free, but it looks like this is 
calculating a new checksum during the upload process. So the question would be, 
could we retrieve that checksum and compare it against our locally computed one?

 

[~mayuehappy] , if we decide to go this direction, then the change to fail a 
job after checksum mismatching during the async phase could be implemented 
easily here: 
{{{}org.apache.flink.runtime.checkpoint.CheckpointFailureManager{}}}. I don't 
think we need an extra ticket for that, separate commit in the same PR will 
suffice. 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-05 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793488#comment-17793488
 ] 

Yue Ma commented on FLINK-27681:


[~masteryhx] [~pnowojski] [~fanrui]  Thanks for the discussion

So, IIUC, currently I think the consensus conclusion is that we need to make 
the job fail if there is file corruption on check , right ? 

For now failure in the checkpoint asynchronous phase will not cause the job to 
fail. Should we open another ticket to support the ability to "fail the job if 
some special exception is occured during the checkpoint asynchronous phase"?

 

 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-05 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793189#comment-17793189
 ] 

Hangxiang Yu commented on FLINK-27681:
--

Sorry for the late reply.
{quote}However,  the job must fail in the future(When the corrupted block is 
read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). 
Then it will rollback to the older checkpoint.

The older checkpoint must be before we found the file is corrupted. Therefore, 
it is useless to run a job between the time it is discovered that the file is 
corrupted and the time it actually fails.

In brief, tolerable-failed-checkpoint can work, but the extra cost isn't 
necessary.

BTW, if failing job directly, this 
[comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] 
will be solved directly.
{quote}
Thanks for the detailed clarification.

I rethinked this, seems that failing the job is more reasonable than failing 
current checkpoint. I'm +1 if we could do that.
{quote}That's a non trivial overhead. Prolonging checkpoint for 10s in many 
cases (especially low throughput large state jobs) will be prohibitively 
expensive, delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might 
also be less then ideal to enable by default.
{quote}
Cannot agree more.
{quote}Actually, aren't all of the disks basically have some form of CRC these 
days? I'm certain that's true about SSDs. Having said that, can you 
[~masteryhx] rephrase and elaborate on those 3 scenarios that you think we need 
to protect from? Especially where does the corruption happen?
{quote}
IIUC, Once we have IO operations about the SST, the file maybe corrupted even 
if it may happen very rarely.

RocksDB also shares some situations about using full file checksum[1] which is 
related to our usage:
 # local file which is prepared to upload: as you could see "verify the SST 
file when the whole file is read in DB (e.g., compaction)." in [1], and 
checksum in block level at runtime cannot guarantee the correctness of the SST 
which we could focus on at first.
 # Uploading and Downloaing: Firstly, disk IO and network IO may make the data 
error. Secondly, remote storage is not always reliable. So the checksum could 
be used when SST files are copied to other places (e.g., backup, move, or 
replicate) or stored remotely.

IIUC, checksum in SST level could guarantee the correctness of local file.

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

[1] 
https://github.com/facebook/rocksdb/wiki/Full-File-Checksum-and-Checksum-Handoff

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-30 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791599#comment-17791599
 ] 

Piotr Nowojski commented on FLINK-27681:


{quote}

However,  the job must fail in the future(When the corrupted block is read or 
compacted, or checkpoint failed number >= tolerable-failed-checkpoint). Then it 
will rollback to the older checkpoint.

The older checkpoint must be before we found the file is corrupted. Therefore, 
it is useless to run a job between the time it is discovered that the file is 
corrupted and the time it actually fails.

In brief, tolerable-failed-checkpoint can work, but the extra cost isn't 
necessary.

{quote}

^^^ This

{quote}

I did some testing on my local machine. It takes about 60 to 70ms to check a 
64M sst file. Checking a 10GB rocksdb instance takes about 10 seconds. More 
detailed testing may be needed later.

{quote}

That's a non trivial overhead. Prolonging checkpoint for 10s in many cases 
(especially low throughput large state jobs) will be prohibitively expensive, 
delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might also be 
less then ideal to enable by default.

{quote}

I'm not familiar with how to enable CRC for filesystem/disk? Would you mind 
describing it in detail?

{quote}

Actually, aren't all of the disks basically have some form of CRC these days? 
I'm certain that's true about SSDs. Having said that, can you [~masteryhx] 
rephrase and elaborate on those 3 scenarios that you think we need to protect 
from? Especially where does the corruption happen?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-29 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791485#comment-17791485
 ] 

Rui Fan commented on FLINK-27681:
-

Hey [~mayuehappy]  [~masteryhx] , thanks for your feedback.:)
{quote}The downside is that the job has to rollback to the older checkpoint. 
But there should be some policies for high-quality job just as [~mayuehappy] 
mentioned.
{quote}
My concern is that if we found the file is corrupted, and fail the checkpoint. 
The job will continue to run (if tolerable-failed-checkpoints > 0),  and all 
checkpoints cannot be completed in the future.

However,  the job must fail in the future(When the corrupted block is read or 
compacted, or checkpoint failed number >= tolerable-failed-checkpoint). Then it 
will rollback to the older checkpoint.

The older checkpoint must be before we found the file is corrupted. Therefore, 
it is useless to run a job between the time it is discovered that the file is 
corrupted and the time it actually fails.

In brief, tolerable-failed-checkpoint can work, but the extra cost isn't 
necessary.

BTW, if failing job directly, this 
[comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] 
will be solved directly.
{quote}The check at runtime is block level, whose overhead should be little 
(rocksdb always need to read the block from the disk at runtime, so the 
checksum could be calculated easily).
{quote}
Thanks [~masteryhx] for the clarification.

 
{quote}Wouldn't the much more reliable and faster solution be to enable CRC on 
the local filesystem/disk that Flink's using? Benefits of this approach:
 * no changes to Flink/no increased complexity of our code base
 * would protect from not only errors that happen to occur between writing the 
file and uploading to the DFS, but also from any errors that happen at any 
point of time
 * would amortise the performance hit. Instead of amplifying reads by 100%, 
error correction bits/bytes are a small fraction of the payload, so the 
performance penalty would be at every read/write access but ultimately a very 
small fraction of the total cost of reading{quote}
[~pnowojski] 's comment also directly causes the job to fail? I'm not familiar 
with how to enable CRC for filesystem/disk? Would you mind describing it in 
detail?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-29 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791409#comment-17791409
 ] 

Hangxiang Yu commented on FLINK-27681:
--

{quote}Fail job directly is fine for me, but I guess the PR doesn't fail the 
job, it just fails the current checkpoint, right?
{quote}
Yeah, I think failing the checkpoint maybe also fine currently. It will not 
affect the correctness of the running job.

The downside is that the job has to rollback to the older checkpoint. But there 
should be some policies for high-quality job just as [~mayuehappy] mentioned.
{quote}If the checksum is called for each reading, can we think the check is 
very quick? If so, could we enable it directly without any option? Hey 
[~mayuehappy]  , could you provide some simple benchmark here?
{quote}
The check at runtime is block level, whose overhead should be little (rocksdb 
always need to read the block from the disk at runtime, so the checksum could 
be calculated easily).

But the checksum in file level will always be done with extra overhead, and the 
overhead will be bigger if the state is very large, so that's why I'd like to 
suggest it as an option. Also appreciate and look forward the benchmark result 
of [~mayuehappy] 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-27 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790359#comment-17790359
 ] 

Yue Ma commented on FLINK-27681:


 
{quote}Fail job directly is fine for me, but I guess the PR doesn't fail the 
job, it just fails the current checkpoint, right?
{quote}
I think it may be used together with the 
{*}execution.checkpointing.tolerable-failed-checkpoints{*}, or generally 
speaking, if it is a high-quality job, users will also pay attention to whether 
the cp production is successful.
{quote}could you provide some simple benchmark here?
{quote}
I did some testing on my local machine. It takes about 60 to 70ms to check a 
64M sst file. Checking a 10GB rocksdb instance takes about 10 seconds. More 
detailed testing may be needed later.


 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-27 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790010#comment-17790010
 ] 

Rui Fan commented on FLINK-27681:
-

{quote} * That's why we'd like to focus on not uploading the corruped files 
(Also for just fail the job simply to make job restore from the last complete 
checkpoint).{quote}
Fail job directly is fine for me, but I guess the PR doesn't fail the job, it 
just fails the current checkpoint, right?

 
{quote}File corruption will not affect the read path because the checksum will 
be checked when reading rocksdb block. The job will failover when read the 
corrupted one.
{quote}
If the checksum is called for each reading, can we think the check is very 
quick? If so, could we enable it directly without any option? Hey [~mayuehappy] 
 , could you provide some simple benchmark here?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-27 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789994#comment-17789994
 ] 

Yue Ma commented on FLINK-27681:


[~masteryhx] Thanks for the explaining , I agree with it

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-26 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789903#comment-17789903
 ] 

Hangxiang Yu commented on FLINK-27681:
--

[~pnowojski] [~fanrui]  Thanks for joining in the discussion.

Thanks [~mayuehappy] for expalining the case which we also saw in our 
production environment.

Let me also try to share my thoughts about your questions.
{quote}I'm worried that flink add check can't completely solve the problem of 
file corruption. Is it possible that file corruption occurs after flink check 
but before uploading the file to hdfs?
{quote}
I think the concern is right.

Actually, file corruption may occurs in all stages:
 # File generation at runtime (RocksDB memtable flush or Compaction)
 # Uploading when checkpoint (local file -> memory buffer -> network transfer 
-> DFS)
 # Downloading when recovery(reversed path with 2)

 

For the first situation: 
 * File corruption will not affect the read path because the checksum will be 
checked when reading rocksdb block. The job will failover when read the 
corrupted one.
 * So the core problem is that a corruption file which is not read at runtime 
will be uploaded to remote DFS when checkpoint. It will affect the normal 
processing once failover which will have severe consequence especially for high 
priority job.
 * That's why we'd like to focus on not uploading the corruped files.

For the second and third situations:
 * The ideal way is that we should unify the checksum machnism of local db and 
remote DFS.
 * Many FileSystems supports to pass the file checksum and verify it in their 
remote server. We could use this to verify the checksum end-to-end.
 * But this may introduce a new API in some public classes like FileSystem 
which is a bigger topic.
 * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we 
could resolve this case at first. I'd also like to see we have the ideal way to 
go if it worth doing.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-26 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789892#comment-17789892
 ] 

Yue Ma commented on FLINK-27681:


Hi [~pnowojski] and [~fanrui] , thanks for your repiles.  
{quote}If one file is uploaded to hdfs in the previous checkpoint, and it's 
corrupted now
{quote}
If the file uploaded in HDFS is good, but it may be corrupted by local disk 
after download during data processing,  can this problem be solved by 
scheduling the TM to another machine after Failover ? Is it more important to 
ensure that the Checkpoint data on HDFS is available. ? BTW, we don’t seem to 
have encountered this situation in our actual production environment. I don’t 
know if you have actually encountered it, or whether we still need to consider 
this situation.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789447#comment-17789447
 ] 

Piotr Nowojski commented on FLINK-27681:


Ahh got it. In that case I would argue always failing over the job is good 
enough. This whole scenario should only happen very very rarely, and it's fine 
if we have one way or another a solution that simply protects from corrupted 
state. Again I would argue some FS/disk level CRCs should be the way to go.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-24 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789427#comment-17789427
 ] 

Rui Fan commented on FLINK-27681:
-

{quote}In what scenario file that we are verifying before the upload, could 
have been uploaded before?
{quote}
What I mean is that we not only cover file corruption when uploading files, we 
can also cover file corruption during data processing.

If one file is uploaded to hdfs in the previous checkpoint, and it's corrupted 
now, we can download it from hdfs to reduce the impact for this job.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789418#comment-17789418
 ] 

Piotr Nowojski commented on FLINK-27681:


Thanks [~fanrui] for pinging me, and double thanks to [~mayuehappy] for 
tackling this issue.

{quote}
In our production environment, most files are damaged due to hardware failures 
on the machine where the file is written
{quote}

Wouldn't the much more reliable and faster solution be to enable CRC on the 
local filesystem/disk that Flink's using? Benefits of this approach:
* no changes to Flink/no increased complexity of our code base
* would protect from not only errors that happen to occur between writing the 
file and uploading to the DFS, but also from any errors that happen at any 
point of time
* would amortise the performance hit. Instead of amplifying reads by 100%, 
error correction bits/bytes are a small fraction of the payload, so the 
performance penalty would be at every read/write access but ultimately a very 
small fraction of the total cost of reading 

Assuming we indeed want to verify files, doing so during the checkpoint's async 
phase is and failing the whole checkpoint if verification fails is a good 
enough solution, that doesn't complicate the code too much.

{quote}
If this file is uploaded to hdfs before, flink try to download it, and let 
rocksdb become health.
If this file isn't uploaded to hdfs, flink job should fail directly, right?

If we only fail the current checkpoint, and 
execution.checkpointing.tolerable-failed-checkpoints > 0, the job will continue 
to run. And flink job will fail later (when this file is read.). And then job 
will recover from latest checkpoint, flink job will consume more duplicate data 
than fail job directly.
{quote}
[~fanrui] In what scenario file that we are verifying before the upload, could 
have been uploaded before?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-24 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789409#comment-17789409
 ] 

Rui Fan commented on FLINK-27681:
-

Thanks for the quick response.
{quote}Strictly speaking, I think it is possible for file corruption to occur 
during the process of uploading and downloading to local. It might be better if 
Flink can add the file verification mechanism during Checkpoint upload and 
download processes. But as far as I know, most DFSs have data verification 
mechanisms, so at least we have not encountered this situation in our 
production environment. Most file corruption occurs before being uploaded to 
HDFS.
{quote}
Sorry, my expression may not be clear.

Yeah, most DFSs have data verification mechanisms. Please see this comment, 
that's my concern. 
[https://github.com/apache/flink/pull/23765#discussion_r1404136470]

 
{quote}Under the default Rocksdb option, after a damaged SST is created, if 
there is no Compaction or Get/Iterator to access this file, DB can always run 
normally. 
{quote}
If so, when this situation is discovered, is it reasonable to let the 
checkpoint fail?

>From a technical perspective, a solution with less impact on the job might be:
 * If this file is uploaded to hdfs before, flink try to download it, and let 
rocksdb become health.
 * If this file isn't uploaded to hdfs, flink job should fail directly, right?

If we only fail the current checkpoint, and 
execution.checkpointing.tolerable-failed-checkpoints > 0, the job will continue 
to run. And flink job will fail later (when this file is read.). And then job 
will recover from latest checkpoint, flink job will consume more duplicate data 
than fail job directly.

Please correct me if I misunderstood anything, thanks~

Also, I'd like to cc [~pnowojski] , he might also be interested in this JIRA.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-24 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789390#comment-17789390
 ] 

Yue Ma commented on FLINK-27681:


[~fanrui]
{quote}But I still don't know why the file is corrupted, would you mind 
describing it in detail?
{quote}
In our production environment, most files are damaged due to hardware failures 
on the machine where the file is written, such as (Memory CE or SSD disk 
hardware failure). Under the default Rocksdb option, after a damaged SST is 
created, if there is no Compaction or Get/Iterator to access this file, DB can 
always run normally. But when the task fails and recovers from Checkpoint, 
there may be other Get requests or Compactions that will read this file, and 
the task will fail at this time.
{quote} Is it possible that file corruption occurs after flink check but before 
uploading the file to hdfs?
{quote}
Strictly speaking, I think it is possible for file corruption to occur during 
the process of uploading and downloading to local. It might be better if Flink 
can add the file verification mechanism during Checkpoint upload and download 
processes. But as far as I know, most DFSs have data verification mechanisms, 
so at least we have not encountered this situation in our production 
environment. Most file corruption occurs before being uploaded to HDFS.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-23 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789355#comment-17789355
 ] 

Rui Fan commented on FLINK-27681:
-

{quote}In our production, the underlying environment may produce some errors, 
resulting in corrupted files. In addition, Flink stores local files in the form 
of a single copy. When a problematic file is uploaded to DFS as a checkpoint, 
this checkpoint will be unavailable.
{quote}
Hi [~Ming Li] [~mayuehappy] , I did a quick review for this PR. But I still 
don't know why the file is corrupted, would you mind describing it in detail?

I'm worried that flink add check can't completely solve the problem of file 
corruption. Is it possible that file corruption occurs after flink check but 
before uploading the file to hdfs?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-21 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788339#comment-17788339
 ] 

Yue Ma commented on FLINK-27681:


[~masteryhx] Sorry for the late reply, I submitted a draft PR, please take a 
look when you have time.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-11-02 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782014#comment-17782014
 ] 

Hangxiang Yu commented on FLINK-27681:
--

[~mayuehappy] Thanks for the reminder. I think it's doable. 
Just assigned to you. Please go ahead.

Just as we discussed: we could introduce a new configuration to enable this, 
and we could just verify the incrementall SST.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Priority: Critical
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-10-29 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780811#comment-17780811
 ] 

Yue Ma commented on FLINK-27681:


[~masteryhx]  
If we want to only check the checksum of incremental sst, can we use the 
SstFileReader.verifyChecksum() ?
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/java/src/main/java/org/rocksdb/SstFileReader.java

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Priority: Critical
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-08-28 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759475#comment-17759475
 ] 

Hangxiang Yu commented on FLINK-27681:
--

[~mayuehappy] Do you mean calling db.VerifyChecksum() in the async thread of 
checkpoint ?

I just rethinked all interfaces rocksdb provided, this may also bring too much 
cost which may result in unavailable checkpoint when enabling this option.
{code:java}
the API call may take a significant amount of time to finish{code}
I think the best way is to only verify the checksum of incremental SST to 
reduce the cost, but seems rocksdb haven't provided the interface to verify in 
the SST level.

 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Priority: Critical
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-08-23 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757850#comment-17757850
 ] 

Yue Ma commented on FLINK-27681:


[~masteryhx] 
I'd like to contribute it
I think there might be two ways to check if the file data is correct, One is to 
check whether the file data is correct by setting DBOptions#setParanoidChecks 
to read the file after it is generated, but this may cause some read 
amplification and CPU overhead. another one is Manually call 
db.VerifyChecksum() to check the correctness of the file when making checkpoint 
? WDYT ? 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Priority: Critical
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-08-17 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755766#comment-17755766
 ] 

Hangxiang Yu commented on FLINK-27681:
--

Hi, [~Ming Li] Thanks for reporting this.
We also saw some similar cases.

I think the core reason is lacking checking checksum when checkpointing , so 
even if the checksum is incorrect, error files are still be uploaded but 
checkpoints still be marked as completed. 
{quote}Can RocksDB periodically check whether all files are correct and find 
the problem in time?
{quote}
I think It's better to check every CP files that will be uploaded so that we 
could avoid local corrupted files affecting remote checkpoint files. But this 
may increase costs and decrease the performace of checkpoints. So maybe a 
configuration could be introduced ? WDYT?
{quote}Can Flink automatically roll back to the previous checkpoint when there 
is a problem with the checkpoint data, because even with manual intervention, 
it just tries to recover from the existing checkpoint or discard the entire 
state.
{quote}
I think it makes sense, but just as I said, we must make sure that every 
checkpoints will not be influenced by the local corrupted files.

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Priority: Critical
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2022-11-30 Thread ming li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641648#comment-17641648
 ] 

ming li commented on FLINK-27681:
-

[~Yanfei Lei] thank you for your reply.

In our production, the underlying environment may produce some errors, 
resulting in corrupted files. In addition, Flink stores local files in the form 
of a single copy. When a problematic file is uploaded to DFS as a checkpoint, 
this checkpoint will be unavailable.


Can this question be simplified to whether checkpoint files need to be 
double-checked at the Flink layer to ensure that errors in the underlying 
environment will not cause checkpoint file errors?

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: ming li
>Priority: Critical
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2022-11-30 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641637#comment-17641637
 ] 

Yanfei Lei commented on FLINK-27681:


I think the correctness of the file should be guaranteed by other systems, 
Although Flink can achieve the suggestions you listed, these things would 
introduce some overheads and increase the complexity of Flink.

 I would prefer to have some external ways to solve this issue, e.g. do native 
savepoint/full checkpoint periodically to reduce the number of data to replay. 
WDYT?

 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: ming li
>Priority: Critical
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)