Re: structured streaming- checkpoint metadata growing indefinetely
For posterity: the problem was FileStreamSourceLog class. I needed to overwrite method shouldRetain, that by default returns true and its doc say: Default implementation retains all log entries. Implementations should override the method to change the behavior. -- Kind regards/ Pozdrawiam, Wojciech Indyk sob., 30 kwi 2022 o 12:35 Wojciech Indyk napisał(a): > Hi Gourav! > I use stateless processing, no watermarking, no aggregations. > I don't want any data loss, so changing checkpoint location is not an > option to me. > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk > > > pt., 29 kwi 2022 o 11:07 Gourav Sengupta > napisał(a): > >> Hi, >> >> this may not solve the problem, but have you tried to stop the job >> gracefully, and then restart without much delay by pointing to a new >> checkpoint location? The approach will have certain uncertainties for >> scenarios where the source system can loose data, or we do not expect >> duplicates to be committed, etc. >> >> It will be good to know what kind of processing you are doing as well. >> >> >> Regards, >> Gourav >> >> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk >> wrote: >> >>> Update for the scenario of deleting compact files: it recovers from the >>> recent (not compacted) checkpoint file, but when it comes to compaction of >>> checkpoint then it fails with missing recent compaction file. I use Spark >>> 3.1.2 >>> >>> -- >>> Kind regards/ Pozdrawiam, >>> Wojciech Indyk >>> >>> >>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk >>> napisał(a): >>> Hello! I use spark struture streaming. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000). I tried a few parameters to remove already processed data from the compact file, namely: "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. As I've seen in the code it's related to previous version of streaming, isn't it? "spark.sql.streaming.fileSource.log.deletion"=true and "spark.sql.streaming.fileSink.log.deletion"=true doesn't work The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time? Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/* files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark. -- Kind regards/ Pozdrawiam, Wojciech Indyk >>>
Re: structured streaming- checkpoint metadata growing indefinetely
Hi Gourav! I use stateless processing, no watermarking, no aggregations. I don't want any data loss, so changing checkpoint location is not an option to me. -- Kind regards/ Pozdrawiam, Wojciech Indyk pt., 29 kwi 2022 o 11:07 Gourav Sengupta napisał(a): > Hi, > > this may not solve the problem, but have you tried to stop the job > gracefully, and then restart without much delay by pointing to a new > checkpoint location? The approach will have certain uncertainties for > scenarios where the source system can loose data, or we do not expect > duplicates to be committed, etc. > > It will be good to know what kind of processing you are doing as well. > > > Regards, > Gourav > > On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk > wrote: > >> Update for the scenario of deleting compact files: it recovers from the >> recent (not compacted) checkpoint file, but when it comes to compaction of >> checkpoint then it fails with missing recent compaction file. I use Spark >> 3.1.2 >> >> -- >> Kind regards/ Pozdrawiam, >> Wojciech Indyk >> >> >> pt., 29 kwi 2022 o 07:00 Wojciech Indyk >> napisał(a): >> >>> Hello! >>> I use spark struture streaming. I need to use s3 for storing checkpoint >>> metadata (I know, it's not optimal storage for checkpoint metadata). >>> Compaction interval is 10 (default) and I set >>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >>> few weeks then checkpointing time increased significantly (cause a few >>> minutes dalay on processing). I looked at checkpoint metadata structure. >>> There is one heavy path there: checkpoint/source/0. Single .compact file >>> weights 25GB. I looked into its content and it contains all entries since >>> batch 0 (current batch is around 25000). I tried a few parameters to remove >>> already processed data from the compact file, namely: >>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. >>> As I've seen in the code it's related to previous version of streaming, >>> isn't it? >>> "spark.sql.streaming.fileSource.log.deletion"=true and >>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >>> The compact file store full history even if all data were processed >>> (except for the most recent checkpoint), so I expect most of entries would >>> be deleted. Is there any parameter to remove entries from compact file or >>> remove compact file gracefully from time to time? Now I am testing scenario >>> when I stop the job, delete most of checkpoint/source/0/* files, keeping >>> just a few recent checkpoints (not compacted) and I rerun the job. The job >>> recovers correctly from recent checkpoint. It looks like possible >>> workaround of my problem, but this scenario with manual delete of >>> checkpoint files looks ugly, so I would prefer something managed by Spark. >>> >>> -- >>> Kind regards/ Pozdrawiam, >>> Wojciech Indyk >>> >>
Re: structured streaming- checkpoint metadata growing indefinetely
Hi, this may not solve the problem, but have you tried to stop the job gracefully, and then restart without much delay by pointing to a new checkpoint location? The approach will have certain uncertainties for scenarios where the source system can loose data, or we do not expect duplicates to be committed, etc. It will be good to know what kind of processing you are doing as well. Regards, Gourav On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk wrote: > Update for the scenario of deleting compact files: it recovers from the > recent (not compacted) checkpoint file, but when it comes to compaction of > checkpoint then it fails with missing recent compaction file. I use Spark > 3.1.2 > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk > > > pt., 29 kwi 2022 o 07:00 Wojciech Indyk > napisał(a): > >> Hello! >> I use spark struture streaming. I need to use s3 for storing checkpoint >> metadata (I know, it's not optimal storage for checkpoint metadata). >> Compaction interval is 10 (default) and I set >> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >> few weeks then checkpointing time increased significantly (cause a few >> minutes dalay on processing). I looked at checkpoint metadata structure. >> There is one heavy path there: checkpoint/source/0. Single .compact file >> weights 25GB. I looked into its content and it contains all entries since >> batch 0 (current batch is around 25000). I tried a few parameters to remove >> already processed data from the compact file, namely: >> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. >> As I've seen in the code it's related to previous version of streaming, >> isn't it? >> "spark.sql.streaming.fileSource.log.deletion"=true and >> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >> The compact file store full history even if all data were processed >> (except for the most recent checkpoint), so I expect most of entries would >> be deleted. Is there any parameter to remove entries from compact file or >> remove compact file gracefully from time to time? Now I am testing scenario >> when I stop the job, delete most of checkpoint/source/0/* files, keeping >> just a few recent checkpoints (not compacted) and I rerun the job. The job >> recovers correctly from recent checkpoint. It looks like possible >> workaround of my problem, but this scenario with manual delete of >> checkpoint files looks ugly, so I would prefer something managed by Spark. >> >> -- >> Kind regards/ Pozdrawiam, >> Wojciech Indyk >> >
Re: structured streaming- checkpoint metadata growing indefinetely
Update for the scenario of deleting compact files: it recovers from the recent (not compacted) checkpoint file, but when it comes to compaction of checkpoint then it fails with missing recent compaction file. I use Spark 3.1.2 -- Kind regards/ Pozdrawiam, Wojciech Indyk pt., 29 kwi 2022 o 07:00 Wojciech Indyk napisał(a): > Hello! > I use spark struture streaming. I need to use s3 for storing checkpoint > metadata (I know, it's not optimal storage for checkpoint metadata). > Compaction interval is 10 (default) and I set > "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a > few weeks then checkpointing time increased significantly (cause a few > minutes dalay on processing). I looked at checkpoint metadata structure. > There is one heavy path there: checkpoint/source/0. Single .compact file > weights 25GB. I looked into its content and it contains all entries since > batch 0 (current batch is around 25000). I tried a few parameters to remove > already processed data from the compact file, namely: > "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. > As I've seen in the code it's related to previous version of streaming, > isn't it? > "spark.sql.streaming.fileSource.log.deletion"=true and > "spark.sql.streaming.fileSink.log.deletion"=true doesn't work > The compact file store full history even if all data were processed > (except for the most recent checkpoint), so I expect most of entries would > be deleted. Is there any parameter to remove entries from compact file or > remove compact file gracefully from time to time? Now I am testing scenario > when I stop the job, delete most of checkpoint/source/0/* files, keeping > just a few recent checkpoints (not compacted) and I rerun the job. The job > recovers correctly from recent checkpoint. It looks like possible > workaround of my problem, but this scenario with manual delete of > checkpoint files looks ugly, so I would prefer something managed by Spark. > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk >
structured streaming- checkpoint metadata growing indefinetely
Hello! I use spark struture streaming. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000). I tried a few parameters to remove already processed data from the compact file, namely: "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. As I've seen in the code it's related to previous version of streaming, isn't it? "spark.sql.streaming.fileSource.log.deletion"=true and "spark.sql.streaming.fileSink.log.deletion"=true doesn't work The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time? Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/* files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark. -- Kind regards/ Pozdrawiam, Wojciech Indyk