[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796843#comment-17796843 ] Piotr Nowojski commented on FLINK-27681: I could help reviewing and generally speaking support this effort, including discussions around the FLIP (this change would have to modify `FileSystem` API - DFS has to support in someway checksum verification). The general plan should be: # Someone would have to investigate what's possible without modification of RocksDB, and works at least with S3. Some tests against S3 and raw benchmarks without using Flink would be needed. For example a simple standalone Java app, uploading the a while verifying the checksum at the same time. Definitely people will be asking what about other DFSs. It would be nice to do a research with Azure/GCP also (might be just looking into the docs without testing). # Create a FLIP. Ideally already with this zero-overhead solution. But if RocksDB would be problematic, something that's already designed with the zero-overhead in ming for the long term, and some intermediate temporary solution would be probably also fine. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796527#comment-17796527 ] Yue Ma commented on FLINK-27681: [~pnowojski] Thanks for your suggestion, I think this is a perfect solution. But it sounds like there is still a long way to go to implement this plan. Do we have any specific plans to do this? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794622#comment-17794622 ] Piotr Nowojski commented on FLINK-27681: {quote} So I understand that we can do this manual check in this ticket first. If the file is detected to be corrupted, we can fail the job. Is this a good choice? {quote} I would prefer to avoid the approach that was proposed here in the ticket: [i] before uploading the file, scan it completely block by block for the data consistency using RocksDB's mechanisms. The problems: * Performance overhead * Works only for RocksDB * Doesn't protect us fully from the data corruption. Corruption can happen just after we checked it locally, but before we uploaded to the DFS So rephrasing what I was proposing in my messages above: [ii] # Calculate some checksum *[A]* ON THE FLY, at the same time that the state file is being written/created. For RocksDB that would require hooking up with the RocksDB itself. It would be easier for the HashMap state backend. But it would have zero additional IO cost, and some minor CPU cost (negligible compared to the IO access) # Remember the checksum *[A]* until: # Depending what the DFS supports, either: ** preferably, verify against the checksum *[A]* ON THE FLY, when file is being uploaded to the DFS. In principle, if implemented by the DFS properly, this should be again basically for free, without and additional IO cost. S3 might actually support that via [1] or [2]. ** after uploading, use DFS api to remotely calculate checksum of the uploaded file, and compare it against the checksum *[A].* S3 does support it [3], quoting: {quote} After uploading objects, you can get the checksum value and compare it to a precalculated or previously stored checksum value calculated using the same algorithm. {quote} Note, we would have to ensure, that checksum [A] is always calculated the same way, both in the statebackend (RocksDB) and DFS (S3). I have no idea if RocksDB supports something like that, but if not: * it should be a welcome contribution by RocksDB maintainers * implementing a hook on our side in our FRocksDB fork doesn't sound too complicated. I would hope it would only require wrapping some {{OutByteStream}} class and that's it. [1] [https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5] [2] [https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums] [3] [https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#using-additional-checksums] > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794505#comment-17794505 ] Yue Ma commented on FLINK-27681: {quote}And the corrupted file maybe just uploaded to remote storage without any check like reading block checksum when checkpoint if we don't check it manually. {quote} So I understand that we can do this manual check in this ticket first. If the file is detected to be corrupted, we can fail the job. Is this a good choice? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794272#comment-17794272 ] Piotr Nowojski commented on FLINK-27681: [~masteryhx] , yes that would require a new FLIP, as one way or another we would have to verify checksums of the uploaded files to the DFS. Also please note my hints/ideas that we might not need any dedicated checksum counting, as it should be possible to hook in and get/calculate checksums on the fly during file creation/file upload. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794190#comment-17794190 ] Hangxiang Yu commented on FLINK-27681: -- {quote}Yes, I think we don't need any extra protection for corruption of the local files. From the document you shared RocksDB will throw some error every time we try to read a corrupted block {quote} Yes, reading a corrupted block must be checked which is safe. But the write operation (e.g. flush, compaction) may introduce a new corrupted file which may not be checked. And the corrupted file maybe just uploaded to remote storage without any check like reading block checksum when checkpoint if we don't check it manually. {quote}Now I'm not so sure about it. Now that I think about it more, checksums on the filesystem level or the HDD/SSD level wouldn't protect us from a corruption happening after reading the bytes from local file, but before those bytes are acknowledged by the DFS/object store. {quote} Yes, you're right. That's what I mentioned before about the end-to-end checksum (verify the file correctness from local to remote by unified checksum). And Thanks for sharing detailed infos about S3. "But this may introduce a new API in some public classes like FileSystem which is a bigger topic." , maybe need a FLIP ? We also have tried to add this end-to-end checksum in our internal Flink version which is doable for many file systems. We could also contribute this after we have verified the benefits and performance cost if worthy doing. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793626#comment-17793626 ] Piotr Nowojski commented on FLINK-27681: {quote} IIUC, checksum in SST level could guarantee the correctness of local file. {quote} Yes, I think we don't need any extra protection for corruption of the local files. From the document you shared RocksDB will throw some error every time we try to read a corrupted block {quote} And checksum in filesystem level could guarantee the correctness of uploading and downloading. {quote} Now I'm not so sure about it. Now that I think about it more, checksums on the filesystem level or the HDD/SSD level wouldn't protect us from a corruption happening after reading the bytes from local file, but before those bytes are acknowledged by the DFS/object store. A neat way would be to calculate the checksum locally, when writing the SST file to the local file system ("Full File Checksum Design" from the document [~masteryhx] shared?), without any significant overhead (bytes that we want to verify would be after all already in the RAM). Next if we could cheaply verify that the uploaded file to the DFS still has the same checksum as computed during creation of that file, we could make sure that no matter what, we always have valid files in the DFS, that we can fallback to everytime RocksDB detects a data corruption when accessing and SST file locally. It looks like this might be do-able in one [1] of the two [2] ways. At least for the S3. [1] [https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5] I don't know if AWS's check against the {{Content-MD5}} field is for free. As far as I understand it, it could be implement to be almost for free, but the docs do not mention that. [2] https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums Here the docs are saying that this is for free, but it looks like this is calculating a new checksum during the upload process. So the question would be, could we retrieve that checksum and compare it against our locally computed one? [~mayuehappy] , if we decide to go this direction, then the change to fail a job after checksum mismatching during the async phase could be implemented easily here: {{{}org.apache.flink.runtime.checkpoint.CheckpointFailureManager{}}}. I don't think we need an extra ticket for that, separate commit in the same PR will suffice. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793488#comment-17793488 ] Yue Ma commented on FLINK-27681: [~masteryhx] [~pnowojski] [~fanrui] Thanks for the discussion So, IIUC, currently I think the consensus conclusion is that we need to make the job fail if there is file corruption on check , right ? For now failure in the checkpoint asynchronous phase will not cause the job to fail. Should we open another ticket to support the ability to "fail the job if some special exception is occured during the checkpoint asynchronous phase"? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793189#comment-17793189 ] Hangxiang Yu commented on FLINK-27681: -- Sorry for the late reply. {quote}However, the job must fail in the future(When the corrupted block is read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). Then it will rollback to the older checkpoint. The older checkpoint must be before we found the file is corrupted. Therefore, it is useless to run a job between the time it is discovered that the file is corrupted and the time it actually fails. In brief, tolerable-failed-checkpoint can work, but the extra cost isn't necessary. BTW, if failing job directly, this [comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] will be solved directly. {quote} Thanks for the detailed clarification. I rethinked this, seems that failing the job is more reasonable than failing current checkpoint. I'm +1 if we could do that. {quote}That's a non trivial overhead. Prolonging checkpoint for 10s in many cases (especially low throughput large state jobs) will be prohibitively expensive, delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might also be less then ideal to enable by default. {quote} Cannot agree more. {quote}Actually, aren't all of the disks basically have some form of CRC these days? I'm certain that's true about SSDs. Having said that, can you [~masteryhx] rephrase and elaborate on those 3 scenarios that you think we need to protect from? Especially where does the corruption happen? {quote} IIUC, Once we have IO operations about the SST, the file maybe corrupted even if it may happen very rarely. RocksDB also shares some situations about using full file checksum[1] which is related to our usage: # local file which is prepared to upload: as you could see "verify the SST file when the whole file is read in DB (e.g., compaction)." in [1], and checksum in block level at runtime cannot guarantee the correctness of the SST which we could focus on at first. # Uploading and Downloaing: Firstly, disk IO and network IO may make the data error. Secondly, remote storage is not always reliable. So the checksum could be used when SST files are copied to other places (e.g., backup, move, or replicate) or stored remotely. IIUC, checksum in SST level could guarantee the correctness of local file. And checksum in filesystem level could guarantee the correctness of uploading and downloading. [1] https://github.com/facebook/rocksdb/wiki/Full-File-Checksum-and-Checksum-Handoff > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791599#comment-17791599 ] Piotr Nowojski commented on FLINK-27681: {quote} However, the job must fail in the future(When the corrupted block is read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). Then it will rollback to the older checkpoint. The older checkpoint must be before we found the file is corrupted. Therefore, it is useless to run a job between the time it is discovered that the file is corrupted and the time it actually fails. In brief, tolerable-failed-checkpoint can work, but the extra cost isn't necessary. {quote} ^^^ This {quote} I did some testing on my local machine. It takes about 60 to 70ms to check a 64M sst file. Checking a 10GB rocksdb instance takes about 10 seconds. More detailed testing may be needed later. {quote} That's a non trivial overhead. Prolonging checkpoint for 10s in many cases (especially low throughput large state jobs) will be prohibitively expensive, delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might also be less then ideal to enable by default. {quote} I'm not familiar with how to enable CRC for filesystem/disk? Would you mind describing it in detail? {quote} Actually, aren't all of the disks basically have some form of CRC these days? I'm certain that's true about SSDs. Having said that, can you [~masteryhx] rephrase and elaborate on those 3 scenarios that you think we need to protect from? Especially where does the corruption happen? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791485#comment-17791485 ] Rui Fan commented on FLINK-27681: - Hey [~mayuehappy] [~masteryhx] , thanks for your feedback.:) {quote}The downside is that the job has to rollback to the older checkpoint. But there should be some policies for high-quality job just as [~mayuehappy] mentioned. {quote} My concern is that if we found the file is corrupted, and fail the checkpoint. The job will continue to run (if tolerable-failed-checkpoints > 0), and all checkpoints cannot be completed in the future. However, the job must fail in the future(When the corrupted block is read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). Then it will rollback to the older checkpoint. The older checkpoint must be before we found the file is corrupted. Therefore, it is useless to run a job between the time it is discovered that the file is corrupted and the time it actually fails. In brief, tolerable-failed-checkpoint can work, but the extra cost isn't necessary. BTW, if failing job directly, this [comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] will be solved directly. {quote}The check at runtime is block level, whose overhead should be little (rocksdb always need to read the block from the disk at runtime, so the checksum could be calculated easily). {quote} Thanks [~masteryhx] for the clarification. {quote}Wouldn't the much more reliable and faster solution be to enable CRC on the local filesystem/disk that Flink's using? Benefits of this approach: * no changes to Flink/no increased complexity of our code base * would protect from not only errors that happen to occur between writing the file and uploading to the DFS, but also from any errors that happen at any point of time * would amortise the performance hit. Instead of amplifying reads by 100%, error correction bits/bytes are a small fraction of the payload, so the performance penalty would be at every read/write access but ultimately a very small fraction of the total cost of reading{quote} [~pnowojski] 's comment also directly causes the job to fail? I'm not familiar with how to enable CRC for filesystem/disk? Would you mind describing it in detail? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791409#comment-17791409 ] Hangxiang Yu commented on FLINK-27681: -- {quote}Fail job directly is fine for me, but I guess the PR doesn't fail the job, it just fails the current checkpoint, right? {quote} Yeah, I think failing the checkpoint maybe also fine currently. It will not affect the correctness of the running job. The downside is that the job has to rollback to the older checkpoint. But there should be some policies for high-quality job just as [~mayuehappy] mentioned. {quote}If the checksum is called for each reading, can we think the check is very quick? If so, could we enable it directly without any option? Hey [~mayuehappy] , could you provide some simple benchmark here? {quote} The check at runtime is block level, whose overhead should be little (rocksdb always need to read the block from the disk at runtime, so the checksum could be calculated easily). But the checksum in file level will always be done with extra overhead, and the overhead will be bigger if the state is very large, so that's why I'd like to suggest it as an option. Also appreciate and look forward the benchmark result of [~mayuehappy] > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790359#comment-17790359 ] Yue Ma commented on FLINK-27681: {quote}Fail job directly is fine for me, but I guess the PR doesn't fail the job, it just fails the current checkpoint, right? {quote} I think it may be used together with the {*}execution.checkpointing.tolerable-failed-checkpoints{*}, or generally speaking, if it is a high-quality job, users will also pay attention to whether the cp production is successful. {quote}could you provide some simple benchmark here? {quote} I did some testing on my local machine. It takes about 60 to 70ms to check a 64M sst file. Checking a 10GB rocksdb instance takes about 10 seconds. More detailed testing may be needed later. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790010#comment-17790010 ] Rui Fan commented on FLINK-27681: - {quote} * That's why we'd like to focus on not uploading the corruped files (Also for just fail the job simply to make job restore from the last complete checkpoint).{quote} Fail job directly is fine for me, but I guess the PR doesn't fail the job, it just fails the current checkpoint, right? {quote}File corruption will not affect the read path because the checksum will be checked when reading rocksdb block. The job will failover when read the corrupted one. {quote} If the checksum is called for each reading, can we think the check is very quick? If so, could we enable it directly without any option? Hey [~mayuehappy] , could you provide some simple benchmark here? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789994#comment-17789994 ] Yue Ma commented on FLINK-27681: [~masteryhx] Thanks for the explaining , I agree with it > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789903#comment-17789903 ] Hangxiang Yu commented on FLINK-27681: -- [~pnowojski] [~fanrui] Thanks for joining in the discussion. Thanks [~mayuehappy] for expalining the case which we also saw in our production environment. Let me also try to share my thoughts about your questions. {quote}I'm worried that flink add check can't completely solve the problem of file corruption. Is it possible that file corruption occurs after flink check but before uploading the file to hdfs? {quote} I think the concern is right. Actually, file corruption may occurs in all stages: # File generation at runtime (RocksDB memtable flush or Compaction) # Uploading when checkpoint (local file -> memory buffer -> network transfer -> DFS) # Downloading when recovery(reversed path with 2) For the first situation: * File corruption will not affect the read path because the checksum will be checked when reading rocksdb block. The job will failover when read the corrupted one. * So the core problem is that a corruption file which is not read at runtime will be uploaded to remote DFS when checkpoint. It will affect the normal processing once failover which will have severe consequence especially for high priority job. * That's why we'd like to focus on not uploading the corruped files. For the second and third situations: * The ideal way is that we should unify the checksum machnism of local db and remote DFS. * Many FileSystems supports to pass the file checksum and verify it in their remote server. We could use this to verify the checksum end-to-end. * But this may introduce a new API in some public classes like FileSystem which is a bigger topic. * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we could resolve this case at first. I'd also like to see we have the ideal way to go if it worth doing. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789892#comment-17789892 ] Yue Ma commented on FLINK-27681: Hi [~pnowojski] and [~fanrui] , thanks for your repiles. {quote}If one file is uploaded to hdfs in the previous checkpoint, and it's corrupted now {quote} If the file uploaded in HDFS is good, but it may be corrupted by local disk after download during data processing, can this problem be solved by scheduling the TM to another machine after Failover ? Is it more important to ensure that the Checkpoint data on HDFS is available. ? BTW, we don’t seem to have encountered this situation in our actual production environment. I don’t know if you have actually encountered it, or whether we still need to consider this situation. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789447#comment-17789447 ] Piotr Nowojski commented on FLINK-27681: Ahh got it. In that case I would argue always failing over the job is good enough. This whole scenario should only happen very very rarely, and it's fine if we have one way or another a solution that simply protects from corrupted state. Again I would argue some FS/disk level CRCs should be the way to go. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789427#comment-17789427 ] Rui Fan commented on FLINK-27681: - {quote}In what scenario file that we are verifying before the upload, could have been uploaded before? {quote} What I mean is that we not only cover file corruption when uploading files, we can also cover file corruption during data processing. If one file is uploaded to hdfs in the previous checkpoint, and it's corrupted now, we can download it from hdfs to reduce the impact for this job. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789418#comment-17789418 ] Piotr Nowojski commented on FLINK-27681: Thanks [~fanrui] for pinging me, and double thanks to [~mayuehappy] for tackling this issue. {quote} In our production environment, most files are damaged due to hardware failures on the machine where the file is written {quote} Wouldn't the much more reliable and faster solution be to enable CRC on the local filesystem/disk that Flink's using? Benefits of this approach: * no changes to Flink/no increased complexity of our code base * would protect from not only errors that happen to occur between writing the file and uploading to the DFS, but also from any errors that happen at any point of time * would amortise the performance hit. Instead of amplifying reads by 100%, error correction bits/bytes are a small fraction of the payload, so the performance penalty would be at every read/write access but ultimately a very small fraction of the total cost of reading Assuming we indeed want to verify files, doing so during the checkpoint's async phase is and failing the whole checkpoint if verification fails is a good enough solution, that doesn't complicate the code too much. {quote} If this file is uploaded to hdfs before, flink try to download it, and let rocksdb become health. If this file isn't uploaded to hdfs, flink job should fail directly, right? If we only fail the current checkpoint, and execution.checkpointing.tolerable-failed-checkpoints > 0, the job will continue to run. And flink job will fail later (when this file is read.). And then job will recover from latest checkpoint, flink job will consume more duplicate data than fail job directly. {quote} [~fanrui] In what scenario file that we are verifying before the upload, could have been uploaded before? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789409#comment-17789409 ] Rui Fan commented on FLINK-27681: - Thanks for the quick response. {quote}Strictly speaking, I think it is possible for file corruption to occur during the process of uploading and downloading to local. It might be better if Flink can add the file verification mechanism during Checkpoint upload and download processes. But as far as I know, most DFSs have data verification mechanisms, so at least we have not encountered this situation in our production environment. Most file corruption occurs before being uploaded to HDFS. {quote} Sorry, my expression may not be clear. Yeah, most DFSs have data verification mechanisms. Please see this comment, that's my concern. [https://github.com/apache/flink/pull/23765#discussion_r1404136470] {quote}Under the default Rocksdb option, after a damaged SST is created, if there is no Compaction or Get/Iterator to access this file, DB can always run normally. {quote} If so, when this situation is discovered, is it reasonable to let the checkpoint fail? >From a technical perspective, a solution with less impact on the job might be: * If this file is uploaded to hdfs before, flink try to download it, and let rocksdb become health. * If this file isn't uploaded to hdfs, flink job should fail directly, right? If we only fail the current checkpoint, and execution.checkpointing.tolerable-failed-checkpoints > 0, the job will continue to run. And flink job will fail later (when this file is read.). And then job will recover from latest checkpoint, flink job will consume more duplicate data than fail job directly. Please correct me if I misunderstood anything, thanks~ Also, I'd like to cc [~pnowojski] , he might also be interested in this JIRA. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789390#comment-17789390 ] Yue Ma commented on FLINK-27681: [~fanrui] {quote}But I still don't know why the file is corrupted, would you mind describing it in detail? {quote} In our production environment, most files are damaged due to hardware failures on the machine where the file is written, such as (Memory CE or SSD disk hardware failure). Under the default Rocksdb option, after a damaged SST is created, if there is no Compaction or Get/Iterator to access this file, DB can always run normally. But when the task fails and recovers from Checkpoint, there may be other Get requests or Compactions that will read this file, and the task will fail at this time. {quote} Is it possible that file corruption occurs after flink check but before uploading the file to hdfs? {quote} Strictly speaking, I think it is possible for file corruption to occur during the process of uploading and downloading to local. It might be better if Flink can add the file verification mechanism during Checkpoint upload and download processes. But as far as I know, most DFSs have data verification mechanisms, so at least we have not encountered this situation in our production environment. Most file corruption occurs before being uploaded to HDFS. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789355#comment-17789355 ] Rui Fan commented on FLINK-27681: - {quote}In our production, the underlying environment may produce some errors, resulting in corrupted files. In addition, Flink stores local files in the form of a single copy. When a problematic file is uploaded to DFS as a checkpoint, this checkpoint will be unavailable. {quote} Hi [~Ming Li] [~mayuehappy] , I did a quick review for this PR. But I still don't know why the file is corrupted, would you mind describing it in detail? I'm worried that flink add check can't completely solve the problem of file corruption. Is it possible that file corruption occurs after flink check but before uploading the file to hdfs? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788339#comment-17788339 ] Yue Ma commented on FLINK-27681: [~masteryhx] Sorry for the late reply, I submitted a draft PR, please take a look when you have time. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Yue Ma >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782014#comment-17782014 ] Hangxiang Yu commented on FLINK-27681: -- [~mayuehappy] Thanks for the reminder. I think it's doable. Just assigned to you. Please go ahead. Just as we discussed: we could introduce a new configuration to enable this, and we could just verify the incrementall SST. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Priority: Critical > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780811#comment-17780811 ] Yue Ma commented on FLINK-27681: [~masteryhx] If we want to only check the checksum of incremental sst, can we use the SstFileReader.verifyChecksum() ? https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/java/src/main/java/org/rocksdb/SstFileReader.java > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Priority: Critical > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759475#comment-17759475 ] Hangxiang Yu commented on FLINK-27681: -- [~mayuehappy] Do you mean calling db.VerifyChecksum() in the async thread of checkpoint ? I just rethinked all interfaces rocksdb provided, this may also bring too much cost which may result in unavailable checkpoint when enabling this option. {code:java} the API call may take a significant amount of time to finish{code} I think the best way is to only verify the checksum of incremental SST to reduce the cost, but seems rocksdb haven't provided the interface to verify in the SST level. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Priority: Critical > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757850#comment-17757850 ] Yue Ma commented on FLINK-27681: [~masteryhx] I'd like to contribute it I think there might be two ways to check if the file data is correct, One is to check whether the file data is correct by setting DBOptions#setParanoidChecks to read the file after it is generated, but this may cause some read amplification and CPU overhead. another one is Manually call db.VerifyChecksum() to check the correctness of the file when making checkpoint ? WDYT ? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Priority: Critical > Attachments: image-2023-08-23-15-06-16-717.png > > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755766#comment-17755766 ] Hangxiang Yu commented on FLINK-27681: -- Hi, [~Ming Li] Thanks for reporting this. We also saw some similar cases. I think the core reason is lacking checking checksum when checkpointing , so even if the checksum is incorrect, error files are still be uploaded but checkpoints still be marked as completed. {quote}Can RocksDB periodically check whether all files are correct and find the problem in time? {quote} I think It's better to check every CP files that will be uploaded so that we could avoid local corrupted files affecting remote checkpoint files. But this may increase costs and decrease the performace of checkpoints. So maybe a configuration could be introduced ? WDYT? {quote}Can Flink automatically roll back to the previous checkpoint when there is a problem with the checkpoint data, because even with manual intervention, it just tries to recover from the existing checkpoint or discard the entire state. {quote} I think it makes sense, but just as I said, we must make sure that every checkpoints will not be influenced by the local corrupted files. > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Priority: Critical > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641648#comment-17641648 ] ming li commented on FLINK-27681: - [~Yanfei Lei] thank you for your reply. In our production, the underlying environment may produce some errors, resulting in corrupted files. In addition, Flink stores local files in the form of a single copy. When a problematic file is uploaded to DFS as a checkpoint, this checkpoint will be unavailable. Can this question be simplified to whether checkpoint files need to be double-checked at the Flink layer to ensure that errors in the underlying environment will not cause checkpoint file errors? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: ming li >Priority: Critical > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
[ https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641637#comment-17641637 ] Yanfei Lei commented on FLINK-27681: I think the correctness of the file should be guaranteed by other systems, Although Flink can achieve the suggestions you listed, these things would introduce some overheads and increase the complexity of Flink. I would prefer to have some external ways to solve this issue, e.g. do native savepoint/full checkpoint periodically to reduce the number of data to replay. WDYT? > Improve the availability of Flink when the RocksDB file is corrupted. > - > > Key: FLINK-27681 > URL: https://issues.apache.org/jira/browse/FLINK-27681 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: ming li >Priority: Critical > > We have encountered several times when the RocksDB checksum does not match or > the block verification fails when the job is restored. The reason for this > situation is generally that there are some problems with the machine where > the task is located, which causes the files uploaded to HDFS to be incorrect, > but it has been a long time (a dozen minutes to half an hour) when we found > this problem. I'm not sure if anyone else has had a similar problem. > Since this file is referenced by incremental checkpoints for a long time, > when the maximum number of checkpoints reserved is exceeded, we can only use > this file until it is no longer referenced. When the job failed, it cannot be > recovered. > Therefore we consider: > 1. Can RocksDB periodically check whether all files are correct and find the > problem in time? > 2. Can Flink automatically roll back to the previous checkpoint when there is > a problem with the checkpoint data, because even with manual intervention, it > just tries to recover from the existing checkpoint or discard the entire > state. > 3. Can we increase the maximum number of references to a file based on the > maximum number of checkpoints reserved? When the number of references exceeds > the maximum number of checkpoints -1, the Task side is required to upload a > new file for this reference. Not sure if this way will ensure that the new > file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)