[Spark SQL] [Bug] Adding `checkpoint()` causes "column [...] cannot be resolved" error
Hi all, Wondering if anyone has run into this as I can't find any similar issues in JIRA, mailing list archives, Stack Overflow, etc. I had a query that was running successfully, but the query planning time was extremely long (4+ hours). To fix this I added `checkpoint()` calls earlier in the code to truncate the query plan. This worked to improve the performance, but now I am getting the error "A column or function parameter with name `B`.`JOIN_KEY` cannot be resolved." Nothing else in the query changed besides the `checkpoint()` calls. The only thing I can surmise is that this is related to a very complex nested query plan where the same table is used multiple times upstream. The general flow is something like this: ```py df = spark.sql("...") df = df.checkpoint() df.createOrReplaceTempView("df") df2 = spark.sql("SELECT JOIN df ...") df2.createOrReplaceTempView("df2") # Error happens here: A column or function parameter with name `a`.`join_key` cannot be resolved. Did you mean one of the following? [`b`.`join_key`, `a`.`col1`, `b`.`col2`] spark.sql(""' SELECT * FROM ( SELECT a.join_key, a.col1, b.col2 FROM df2 b LEFT JOIN df a ON b.join_key = a.join_key ) """) ``` In the actual code df and df2 are very complex multi-level nested views built upon other views. If I checkpoint all of the dataframes in the query right before I run it the error goes away. Unfortunately I have not been able to put together a minimal reproducible example. Any ideas? Thanks, Robin
checkpoint file deletion
Hi all, I performed a stateful structure streaming job, and configured spark.cleaner.referenceTracking.cleanCheckpoints to true spark.cleaner.periodicGC.interval to 1min in the config. But the checkpoint files are not deleted and the number of them keeps growing. Did I miss something? Lingzhe Sun Hirain Technology
Re: structured streaming- checkpoint metadata growing indefinetely
For posterity: the problem was FileStreamSourceLog class. I needed to overwrite method shouldRetain, that by default returns true and its doc say: Default implementation retains all log entries. Implementations should override the method to change the behavior. -- Kind regards/ Pozdrawiam, Wojciech Indyk sob., 30 kwi 2022 o 12:35 Wojciech Indyk napisał(a): > Hi Gourav! > I use stateless processing, no watermarking, no aggregations. > I don't want any data loss, so changing checkpoint location is not an > option to me. > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk > > > pt., 29 kwi 2022 o 11:07 Gourav Sengupta > napisał(a): > >> Hi, >> >> this may not solve the problem, but have you tried to stop the job >> gracefully, and then restart without much delay by pointing to a new >> checkpoint location? The approach will have certain uncertainties for >> scenarios where the source system can loose data, or we do not expect >> duplicates to be committed, etc. >> >> It will be good to know what kind of processing you are doing as well. >> >> >> Regards, >> Gourav >> >> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk >> wrote: >> >>> Update for the scenario of deleting compact files: it recovers from the >>> recent (not compacted) checkpoint file, but when it comes to compaction of >>> checkpoint then it fails with missing recent compaction file. I use Spark >>> 3.1.2 >>> >>> -- >>> Kind regards/ Pozdrawiam, >>> Wojciech Indyk >>> >>> >>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk >>> napisał(a): >>> >>>> Hello! >>>> I use spark struture streaming. I need to use s3 for storing checkpoint >>>> metadata (I know, it's not optimal storage for checkpoint metadata). >>>> Compaction interval is 10 (default) and I set >>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >>>> few weeks then checkpointing time increased significantly (cause a few >>>> minutes dalay on processing). I looked at checkpoint metadata structure. >>>> There is one heavy path there: checkpoint/source/0. Single .compact file >>>> weights 25GB. I looked into its content and it contains all entries since >>>> batch 0 (current batch is around 25000). I tried a few parameters to remove >>>> already processed data from the compact file, namely: >>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not >>>> work. As I've seen in the code it's related to previous version of >>>> streaming, isn't it? >>>> "spark.sql.streaming.fileSource.log.deletion"=true and >>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >>>> The compact file store full history even if all data were processed >>>> (except for the most recent checkpoint), so I expect most of entries would >>>> be deleted. Is there any parameter to remove entries from compact file or >>>> remove compact file gracefully from time to time? Now I am testing scenario >>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping >>>> just a few recent checkpoints (not compacted) and I rerun the job. The job >>>> recovers correctly from recent checkpoint. It looks like possible >>>> workaround of my problem, but this scenario with manual delete of >>>> checkpoint files looks ugly, so I would prefer something managed by Spark. >>>> >>>> -- >>>> Kind regards/ Pozdrawiam, >>>> Wojciech Indyk >>>> >>>
Re: structured streaming- checkpoint metadata growing indefinetely
Hi Gourav! I use stateless processing, no watermarking, no aggregations. I don't want any data loss, so changing checkpoint location is not an option to me. -- Kind regards/ Pozdrawiam, Wojciech Indyk pt., 29 kwi 2022 o 11:07 Gourav Sengupta napisał(a): > Hi, > > this may not solve the problem, but have you tried to stop the job > gracefully, and then restart without much delay by pointing to a new > checkpoint location? The approach will have certain uncertainties for > scenarios where the source system can loose data, or we do not expect > duplicates to be committed, etc. > > It will be good to know what kind of processing you are doing as well. > > > Regards, > Gourav > > On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk > wrote: > >> Update for the scenario of deleting compact files: it recovers from the >> recent (not compacted) checkpoint file, but when it comes to compaction of >> checkpoint then it fails with missing recent compaction file. I use Spark >> 3.1.2 >> >> -- >> Kind regards/ Pozdrawiam, >> Wojciech Indyk >> >> >> pt., 29 kwi 2022 o 07:00 Wojciech Indyk >> napisał(a): >> >>> Hello! >>> I use spark struture streaming. I need to use s3 for storing checkpoint >>> metadata (I know, it's not optimal storage for checkpoint metadata). >>> Compaction interval is 10 (default) and I set >>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >>> few weeks then checkpointing time increased significantly (cause a few >>> minutes dalay on processing). I looked at checkpoint metadata structure. >>> There is one heavy path there: checkpoint/source/0. Single .compact file >>> weights 25GB. I looked into its content and it contains all entries since >>> batch 0 (current batch is around 25000). I tried a few parameters to remove >>> already processed data from the compact file, namely: >>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. >>> As I've seen in the code it's related to previous version of streaming, >>> isn't it? >>> "spark.sql.streaming.fileSource.log.deletion"=true and >>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >>> The compact file store full history even if all data were processed >>> (except for the most recent checkpoint), so I expect most of entries would >>> be deleted. Is there any parameter to remove entries from compact file or >>> remove compact file gracefully from time to time? Now I am testing scenario >>> when I stop the job, delete most of checkpoint/source/0/* files, keeping >>> just a few recent checkpoints (not compacted) and I rerun the job. The job >>> recovers correctly from recent checkpoint. It looks like possible >>> workaround of my problem, but this scenario with manual delete of >>> checkpoint files looks ugly, so I would prefer something managed by Spark. >>> >>> -- >>> Kind regards/ Pozdrawiam, >>> Wojciech Indyk >>> >>
Re: structured streaming- checkpoint metadata growing indefinetely
Hi, this may not solve the problem, but have you tried to stop the job gracefully, and then restart without much delay by pointing to a new checkpoint location? The approach will have certain uncertainties for scenarios where the source system can loose data, or we do not expect duplicates to be committed, etc. It will be good to know what kind of processing you are doing as well. Regards, Gourav On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk wrote: > Update for the scenario of deleting compact files: it recovers from the > recent (not compacted) checkpoint file, but when it comes to compaction of > checkpoint then it fails with missing recent compaction file. I use Spark > 3.1.2 > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk > > > pt., 29 kwi 2022 o 07:00 Wojciech Indyk > napisał(a): > >> Hello! >> I use spark struture streaming. I need to use s3 for storing checkpoint >> metadata (I know, it's not optimal storage for checkpoint metadata). >> Compaction interval is 10 (default) and I set >> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >> few weeks then checkpointing time increased significantly (cause a few >> minutes dalay on processing). I looked at checkpoint metadata structure. >> There is one heavy path there: checkpoint/source/0. Single .compact file >> weights 25GB. I looked into its content and it contains all entries since >> batch 0 (current batch is around 25000). I tried a few parameters to remove >> already processed data from the compact file, namely: >> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. >> As I've seen in the code it's related to previous version of streaming, >> isn't it? >> "spark.sql.streaming.fileSource.log.deletion"=true and >> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >> The compact file store full history even if all data were processed >> (except for the most recent checkpoint), so I expect most of entries would >> be deleted. Is there any parameter to remove entries from compact file or >> remove compact file gracefully from time to time? Now I am testing scenario >> when I stop the job, delete most of checkpoint/source/0/* files, keeping >> just a few recent checkpoints (not compacted) and I rerun the job. The job >> recovers correctly from recent checkpoint. It looks like possible >> workaround of my problem, but this scenario with manual delete of >> checkpoint files looks ugly, so I would prefer something managed by Spark. >> >> -- >> Kind regards/ Pozdrawiam, >> Wojciech Indyk >> >
Re: structured streaming- checkpoint metadata growing indefinetely
Update for the scenario of deleting compact files: it recovers from the recent (not compacted) checkpoint file, but when it comes to compaction of checkpoint then it fails with missing recent compaction file. I use Spark 3.1.2 -- Kind regards/ Pozdrawiam, Wojciech Indyk pt., 29 kwi 2022 o 07:00 Wojciech Indyk napisał(a): > Hello! > I use spark struture streaming. I need to use s3 for storing checkpoint > metadata (I know, it's not optimal storage for checkpoint metadata). > Compaction interval is 10 (default) and I set > "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a > few weeks then checkpointing time increased significantly (cause a few > minutes dalay on processing). I looked at checkpoint metadata structure. > There is one heavy path there: checkpoint/source/0. Single .compact file > weights 25GB. I looked into its content and it contains all entries since > batch 0 (current batch is around 25000). I tried a few parameters to remove > already processed data from the compact file, namely: > "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. > As I've seen in the code it's related to previous version of streaming, > isn't it? > "spark.sql.streaming.fileSource.log.deletion"=true and > "spark.sql.streaming.fileSink.log.deletion"=true doesn't work > The compact file store full history even if all data were processed > (except for the most recent checkpoint), so I expect most of entries would > be deleted. Is there any parameter to remove entries from compact file or > remove compact file gracefully from time to time? Now I am testing scenario > when I stop the job, delete most of checkpoint/source/0/* files, keeping > just a few recent checkpoints (not compacted) and I rerun the job. The job > recovers correctly from recent checkpoint. It looks like possible > workaround of my problem, but this scenario with manual delete of > checkpoint files looks ugly, so I would prefer something managed by Spark. > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk >
structured streaming- checkpoint metadata growing indefinetely
Hello! I use spark struture streaming. I need to use s3 for storing checkpoint metadata (I know, it's not optimal storage for checkpoint metadata). Compaction interval is 10 (default) and I set "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a few weeks then checkpointing time increased significantly (cause a few minutes dalay on processing). I looked at checkpoint metadata structure. There is one heavy path there: checkpoint/source/0. Single .compact file weights 25GB. I looked into its content and it contains all entries since batch 0 (current batch is around 25000). I tried a few parameters to remove already processed data from the compact file, namely: "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. As I've seen in the code it's related to previous version of streaming, isn't it? "spark.sql.streaming.fileSource.log.deletion"=true and "spark.sql.streaming.fileSink.log.deletion"=true doesn't work The compact file store full history even if all data were processed (except for the most recent checkpoint), so I expect most of entries would be deleted. Is there any parameter to remove entries from compact file or remove compact file gracefully from time to time? Now I am testing scenario when I stop the job, delete most of checkpoint/source/0/* files, keeping just a few recent checkpoints (not compacted) and I rerun the job. The job recovers correctly from recent checkpoint. It looks like possible workaround of my problem, but this scenario with manual delete of checkpoint files looks ugly, so I would prefer something managed by Spark. -- Kind regards/ Pozdrawiam, Wojciech Indyk
Re: [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint
1. Is your join and aggregation based on the same keys? You might want to look at the execution plan. It is possible that without checkpointing, Spark puts join and aggregation into the same stage to eliminate shuffling. With a checkpoint, you might have forced Spark to introduce a shuffle. I am blind guessing here. You need to look at the execution plan to understand what Spark is doing internally 2) Are you certain that you are aggregating on the data that you get from checkpoint? Or are you aggregating on the data frame that you checkpointed? If it’s the latter, spark might be executing your read + join twice. Again, you might want to look at the execution plan From: "Schneider, Felix Jan" Date: Thursday, October 7, 2021 at 7:56 AM To: "user@spark.apache.org" Subject: [EXTERNAL] [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Spark-Users, my name is Felix and I’m currently working on a master’s thesis on adaptive checkpointing in Spark at TU Berlin. As part of this work, I experimented with a Spark Kubernetes cluster to measure the runtime differences of Spark applications with and without Reliable checkpoints of DataFrames. Below, I first explain how the experiment is set up and the assumptions I want to verify. Second, I describe how the experiment was conducted and what the outcome was. In the end, I share my open questions. Experiment setup: * DataFrame A: 20GB tabular data (e.g. a list of orders) * DataFrame B: 10GB tabular data (e.g. a list of order items) * Spark application A: first performs an inner-join of DataFrame A and DataFrame B to get the joined DataFrame C and then aggregates the joined DataFrame C * Spark application B: Does the same as Spark application A plus it checkpoints the joined DataFrame C before it aggregates it * Driver configuration: 1 driver instance with 4 CPU cores and 4GB of memory * Executors configuration: 10 executor instances with 4 CPU cores and 8GB of memory each Assumptions I want to verify: 1. Spark application A will take less time than Spark application B because Spark application B needs time to write a checkpoint to reliable storage (HDFS) 2. The mean runtime difference of Spark application A and Spark application B will be the mean of the time it will take to write the checkpoint. Experiment sequence: 1. 5 executions of Spark application A 2. 5 executions of Spark application B 3. The request of the total duration of each application from Spark's history server API 4. Reading the time it took to checkpoint a DataFrame from the driver log of execution, e.g.: * 21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took 10024 ms. * 21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done checkpointing RDD 68 to hdfs://checkpoints/rdd-68, new parent is RDD 77 1. Calculating the mean of the total duration of Spark application A’s executions and Spark application B’s executions to account for the differences in runtime due to e.g. cluster utilization 2. Calculating the mean of the time it took to checkpoint in Spark application B’s executions for the same reason as in 5. Experiment results: * The mean runtime of Spark application A: 11.72 minutes * The mean runtime of Spark application B: 27.38 minutes * -> The first assumption can be verified * The mean time it took to write a checkpoint in Spark application B: 3.41 minutes * -> The second assumption can not be verified because: (27.38 minutes - 11.72 minutes) > 3.41 minutes Questions: 1. Is this a valid approach to collect the time for a checkpoint or are there other possible ways to measure this? 2. How can the difference in the mean runtime of Spark application A and Spark application B be explained if it’s greater than the mean of the time it took to write a checkpoint in Spark application B? Feel free to share your thoughts and suggestions on the questions. I’m happy to discuss this topic. Thanks and kind regards, Felix
Re: [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint
Hi Felix, Is this one off or streaming data? What Kubernetes cluster are you using? What is the actual code for checkpointinf Finally I’d this scale or PySpark? HTH On Thu, 7 Oct 2021 at 12:56, Schneider, Felix Jan < felix.j.schnei...@campus.tu-berlin.de> wrote: > Hi Spark-Users, > > my name is Felix and I’m currently working on a master’s thesis on > adaptive checkpointing in Spark at TU Berlin. > As part of this work, I experimented with a Spark Kubernetes cluster to > measure the runtime differences of Spark applications with and without > Reliable checkpoints of DataFrames. > Below, I first explain how the experiment is set up and the assumptions I > want to verify. > Second, I describe how the experiment was conducted and what the outcome > was. > In the end, I share my open questions. > > *Experiment setup:* > >- *DataFrame* *A*: 20GB tabular data (e.g. a list of orders) >- *DataFrame* *B*: 10GB tabular data (e.g. a list of order items) >- *Spark application A*: first performs an inner-join of *DataFrame* >*A *and *DataFrame* *B *to get *the joined DataFrame C* and then >aggregates *the joined DataFrame C* >- *Spark application B*: Does the same as *Spark application A* plus >it checkpoints *the joined DataFrame C *before it aggregates it >- *Driver configuration: *1 driver instance with 4 CPU cores and 4GB >of memory >- *Executors configuration:* 10 executor instances with 4 CPU cores >and 8GB of memory each > > > *Assumptions I want to verify:* > >1. *Spark application A *will take less time than* Spark application >B *because *Spark application B* needs time to write a checkpoint to >reliable storage (HDFS) >2. The mean runtime difference of *Spark application A *and *Spark >application B *will be the mean of the time it will take to write the >checkpoint. > > > *Experiment sequence:* > >1. 5 executions of *Spark application A* >2. 5 executions of *Spark application B* >3. The request of the total duration of each application from Spark's >history server API >4. Reading the time it took to checkpoint a DataFrame from the driver >log of execution, e.g.: > 1. 21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took > 10024 ms. > 2. 21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done > checkpointing RDD 68 to hdfs://checkpoints/rdd-68, new parent is > RDD 77 >5. Calculating the mean of the total duration of *Spark application A*’s >executions and *Spark application B*’s executions to account for the >differences in runtime due to e.g. cluster utilization >6. Calculating the mean of the time it took to checkpoint in *Spark >application B*’s executions for the same reason as in 5. > > > *Experiment results:* > >- The mean runtime of *Spark application A*: *11.72 minutes* >- The mean runtime of *Spark application B*: *27.38 minutes* >- -> The first assumption can be verified >- The mean time it took to write a checkpoint in *Spark application B* >: *3.41 minutes* >- -> The second assumption can not be verified because: (27.38 minutes >- 11.72 minutes) > 3.41 minutes > > > *Questions:* > >1. Is this a valid approach to collect the time for a checkpoint or >are there other possible ways to measure this? >2. How can the difference in the mean runtime of *Spark application A >and Spark application B *be explained if it’s greater than the mean of >the time it took to write a checkpoint in *Spark application B*? > > > Feel free to share your thoughts and suggestions on the questions. > I’m happy to discuss this topic. > > Thanks and kind regards, > Felix > > -- view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
[Spark Core][Intermediate][How-to]: Measure the time for a checkpoint
Hi Spark-Users, my name is Felix and I’m currently working on a master’s thesis on adaptive checkpointing in Spark at TU Berlin. As part of this work, I experimented with a Spark Kubernetes cluster to measure the runtime differences of Spark applications with and without Reliable checkpoints of DataFrames. Below, I first explain how the experiment is set up and the assumptions I want to verify. Second, I describe how the experiment was conducted and what the outcome was. In the end, I share my open questions. Experiment setup: * DataFrame A: 20GB tabular data (e.g. a list of orders) * DataFrame B: 10GB tabular data (e.g. a list of order items) * Spark application A: first performs an inner-join of DataFrame A and DataFrame B to get the joined DataFrame C and then aggregates the joined DataFrame C * Spark application B: Does the same as Spark application A plus it checkpoints the joined DataFrame C before it aggregates it * Driver configuration: 1 driver instance with 4 CPU cores and 4GB of memory * Executors configuration: 10 executor instances with 4 CPU cores and 8GB of memory each Assumptions I want to verify: 1. Spark application A will take less time than Spark application B because Spark application B needs time to write a checkpoint to reliable storage (HDFS) 2. The mean runtime difference of Spark application A and Spark application B will be the mean of the time it will take to write the checkpoint. Experiment sequence: 1. 5 executions of Spark application A 2. 5 executions of Spark application B 3. The request of the total duration of each application from Spark's history server API 4. Reading the time it took to checkpoint a DataFrame from the driver log of execution, e.g.: * 21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took 10024 ms. * 21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done checkpointing RDD 68 to hdfs://checkpoints/rdd-68, new parent is RDD 77 5. Calculating the mean of the total duration of Spark application A’s executions and Spark application B’s executions to account for the differences in runtime due to e.g. cluster utilization 6. Calculating the mean of the time it took to checkpoint in Spark application B’s executions for the same reason as in 5. Experiment results: * The mean runtime of Spark application A: 11.72 minutes * The mean runtime of Spark application B: 27.38 minutes * -> The first assumption can be verified * The mean time it took to write a checkpoint in Spark application B: 3.41 minutes * -> The second assumption can not be verified because: (27.38 minutes - 11.72 minutes) > 3.41 minutes Questions: 1. Is this a valid approach to collect the time for a checkpoint or are there other possible ways to measure this? 2. How can the difference in the mean runtime of Spark application A and Spark application B be explained if it’s greater than the mean of the time it took to write a checkpoint in Spark application B? Feel free to share your thoughts and suggestions on the questions. I’m happy to discuss this topic. Thanks and kind regards, Felix
Re: Structured Streaming Checkpoint Error
Thanks Jungtaek! It makes sense, we are currently changing to an HDFS-Compatible FS, I was wondering how this change would impact the checkpoint, but after what you said it is more clear now. On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim wrote: > In theory it would work, but works very inefficiently on checkpointing. If > I understand correctly, it will write the content to the temp file on s3, > and rename the file which actually gets the temp file from s3 and write the > content of temp file to the final path on s3. Compared to checkpoint with > HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom > implementation of checkpoint manager on S3. > > Also atomic rename is still not working for S3, as well as S3 doesn't > support write with overwrite=false. That said, there's no barrier if > concurrent streaming queries access to the same checkpoint and mess up. > With checkpoint in HDFS, the rename is atomic and only one succeeds even in > parallel and the other query lost writing to the checkpoint file simply > fails. That's a caveat you may want to keep in mind. > > On Wed, Dec 2, 2020 at 11:35 PM German Schiavon > wrote: > >> Hello! >> >> @Gabor Somogyi I wonder that now that s3 is >> *strongly >> consistent* , would work fine. >> >> >> Regards! >> >> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ >> >> On Thu, 17 Sep 2020 at 11:55, German Schiavon >> wrote: >> >>> Hi Gabor, >>> >>> Makes sense, thanks a lot! >>> >>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi >>> wrote: >>> >>>> Hi, >>>> >>>> Structured Streaming is simply not working when checkpoint location is >>>> on S3 due to it's read-after-write consistency. >>>> Please choose an HDFS compliant filesystem and it will work like a >>>> charm. >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon < >>>> gschiavonsp...@gmail.com> wrote: >>>> >>>>> Hi! >>>>> >>>>> I have an Structured Streaming Application that reads from kafka, >>>>> performs some aggregations and writes in S3 in parquet format. >>>>> >>>>> Everything seems to work great except that from time to time I get a >>>>> checkpoint error, at the beginning I thought it was a random error but it >>>>> happened more than 3 times already in a few days >>>>> >>>>> Caused by: java.io.FileNotFoundException: No such file or directory: >>>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>>> >>>>> >>>>> Does this happen to anyone else? >>>>> >>>>> Thanks in advance. >>>>> >>>>> *This is the full error :* >>>>> >>>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >>>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >>>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >>>>> >>>>> java.io.FileNotFoundException: No such file or directory: >>>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>>> >>>>> at >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >>>>> >>>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >>>>> >>>>> at >>>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >>>>> at scala.Option.getOrElse(Option.scala:189) >>>>> >>>>
Re: Structured Streaming Checkpoint Error
In theory it would work, but works very inefficiently on checkpointing. If I understand correctly, it will write the content to the temp file on s3, and rename the file which actually gets the temp file from s3 and write the content of temp file to the final path on s3. Compared to checkpoint with HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom implementation of checkpoint manager on S3. Also atomic rename is still not working for S3, as well as S3 doesn't support write with overwrite=false. That said, there's no barrier if concurrent streaming queries access to the same checkpoint and mess up. With checkpoint in HDFS, the rename is atomic and only one succeeds even in parallel and the other query lost writing to the checkpoint file simply fails. That's a caveat you may want to keep in mind. On Wed, Dec 2, 2020 at 11:35 PM German Schiavon wrote: > Hello! > > @Gabor Somogyi I wonder that now that s3 is > *strongly > consistent* , would work fine. > > > Regards! > > https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ > > On Thu, 17 Sep 2020 at 11:55, German Schiavon > wrote: > >> Hi Gabor, >> >> Makes sense, thanks a lot! >> >> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi >> wrote: >> >>> Hi, >>> >>> Structured Streaming is simply not working when checkpoint location is >>> on S3 due to it's read-after-write consistency. >>> Please choose an HDFS compliant filesystem and it will work like a charm. >>> >>> BR, >>> G >>> >>> >>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon < >>> gschiavonsp...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> I have an Structured Streaming Application that reads from kafka, >>>> performs some aggregations and writes in S3 in parquet format. >>>> >>>> Everything seems to work great except that from time to time I get a >>>> checkpoint error, at the beginning I thought it was a random error but it >>>> happened more than 3 times already in a few days >>>> >>>> Caused by: java.io.FileNotFoundException: No such file or directory: >>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>> >>>> >>>> Does this happen to anyone else? >>>> >>>> Thanks in advance. >>>> >>>> *This is the full error :* >>>> >>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >>>> >>>> java.io.FileNotFoundException: No such file or directory: >>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>> >>>> at >>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >>>> >>>> at >>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >>>> >>>> at >>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >>>> >>>> at >>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >>>> >>>> at >>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >>>> >>>> at >>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >>>> >>>> at >>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >>>> >>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >>>> >>>> at >>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >>>> >>>> at >>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >>>> at scala.Option.getOrElse(Option.scala:189) >>>> >>>
Re: Structured Streaming Checkpoint Error
Hello! @Gabor Somogyi I wonder that now that s3 is *strongly consistent* , would work fine. Regards! https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ On Thu, 17 Sep 2020 at 11:55, German Schiavon wrote: > Hi Gabor, > > Makes sense, thanks a lot! > > On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi > wrote: > >> Hi, >> >> Structured Streaming is simply not working when checkpoint location is on >> S3 due to it's read-after-write consistency. >> Please choose an HDFS compliant filesystem and it will work like a charm. >> >> BR, >> G >> >> >> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon >> wrote: >> >>> Hi! >>> >>> I have an Structured Streaming Application that reads from kafka, >>> performs some aggregations and writes in S3 in parquet format. >>> >>> Everything seems to work great except that from time to time I get a >>> checkpoint error, at the beginning I thought it was a random error but it >>> happened more than 3 times already in a few days >>> >>> Caused by: java.io.FileNotFoundException: No such file or directory: >>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>> >>> >>> Does this happen to anyone else? >>> >>> Thanks in advance. >>> >>> *This is the full error :* >>> >>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >>> >>> java.io.FileNotFoundException: No such file or directory: >>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>> >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >>> >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >>> >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >>> >>> at >>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >>> >>> at >>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >>> >>> at >>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >>> >>> at >>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >>> >>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >>> >>> at >>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >>> >>> at >>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >>> >>> at >>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >>> >>> at >>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >>> >>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >>> at scala.Option.getOrElse(Option.scala:189) >>> >>
Re: how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1
sorry, the mail title is a little problematic. "How to disable or replace .." lec ssmi 于2020年10月14日周三 上午9:27写道: > I have written a demo using spark3.0.0, and the location where the > checkpoint file is saved has been explicitly specified like >> >> stream.option("checkpointLocation","file:///C:\\Users\\Administrator >> \\Desktop\\test") > > But the app still throws an exception about the HDFS file system. > Is it not possible to specify the local file system as a checkpoint > location now? >
how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1
I have written a demo using spark3.0.0, and the location where the checkpoint file is saved has been explicitly specified like > > stream.option("checkpointLocation","file:///C:\\Users\\Administrator\\ > Desktop\\test") But the app still throws an exception about the HDFS file system. Is it not possible to specify the local file system as a checkpoint location now?
Re: Structured Streaming Checkpoint Error
Hi Gabor, Makes sense, thanks a lot! On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi wrote: > Hi, > > Structured Streaming is simply not working when checkpoint location is on > S3 due to it's read-after-write consistency. > Please choose an HDFS compliant filesystem and it will work like a charm. > > BR, > G > > > On Wed, Sep 16, 2020 at 4:12 PM German Schiavon > wrote: > >> Hi! >> >> I have an Structured Streaming Application that reads from kafka, >> performs some aggregations and writes in S3 in parquet format. >> >> Everything seems to work great except that from time to time I get a >> checkpoint error, at the beginning I thought it was a random error but it >> happened more than 3 times already in a few days >> >> Caused by: java.io.FileNotFoundException: No such file or directory: >> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >> >> >> Does this happen to anyone else? >> >> Thanks in advance. >> >> *This is the full error :* >> >> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >> >> java.io.FileNotFoundException: No such file or directory: >> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >> >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >> >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >> >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >> >> at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >> >> at >> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >> >> at >> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >> >> at >> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >> >> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >> >> at >> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >> >> at >> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >> >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >> >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >> >> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >> at scala.Option.getOrElse(Option.scala:189) >> >
Re: Structured Streaming Checkpoint Error
Hi, Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency. Please choose an HDFS compliant filesystem and it will work like a charm. BR, G On Wed, Sep 16, 2020 at 4:12 PM German Schiavon wrote: > Hi! > > I have an Structured Streaming Application that reads from kafka, performs > some aggregations and writes in S3 in parquet format. > > Everything seems to work great except that from time to time I get a > checkpoint error, at the beginning I thought it was a random error but it > happened more than 3 times already in a few days > > Caused by: java.io.FileNotFoundException: No such file or directory: > s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp > > > Does this happen to anyone else? > > Thanks in advance. > > *This is the full error :* > > ERROR streaming.MicroBatchExecution: Query segmentValidation [id = > 14edaddf-25bb-4259-b7a2-6107907f962f, runId = > 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error > > java.io.FileNotFoundException: No such file or directory: > s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) > > at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) > > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) > > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) > > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) > > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) > > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at scala.Option.getOrElse(Option.scala:189) >
Structured Streaming Checkpoint Error
Hi! I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format. Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp Does this happen to anyone else? Thanks in advance. *This is the full error :* ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at scala.Option.getOrElse(Option.scala:189)
Re: Appropriate checkpoint interval in a spark streaming application
Guys any inputs explaining the rationale on the below question will really help. Requesting some expert opinion. Regards, Sheel On Sat, 15 Aug, 2020, 1:47 PM Sheel Pancholi, wrote: > Hello, > > I am trying to figure an appropriate checkpoint interval for my spark > streaming application. Its Spark Kafka integration based on Direct Streams. > > If my *micro batch interval is 2 mins*, and let's say *each microbatch > takes only 15 secs to process* then shouldn't my checkpoint interval also > be exactly 2 mins? > > Assuming my spark streaming application starts at t=0, following will be > the state of my checkpoint: > > *Case 1: checkpoint interval is less than microbatch interval* > If I keep my *checkpoint interval at say 1 minute *then: > *t=1m: *no incomplete batches in this checkpoint > *t=2m*: first microbatch is included as an incomplete microbatch in the > checkpoint and microbatch execution then begins > *t=3m*: no incomplete batches in the checkpoint as the first microbatch > is finished processing in just 15 secs > *t=4m*: second microbatch is included as an incomplete microbatch in the > checkpoint and microbatch execution then begins > *t=4m30s*: system breaks down; on restarting the streaming application > finds the checkpoint at t=4 with the second microbatch as the incomplete > microbatch and processes it again. But what's the point of reprocessing it > again since the second microbatch's processing was completed at the=4m15s > > *Case 2: checkpoint interval is more than microbatch interval* > If I keep my *checkpoint interval at say 4 minutes* then: > *t=2m* first microbatch execution begins > *t=4m* first checkpoint with second microbatch included as the only > incomplete batch; second microbatch processing begins > > *Sub case 1 :* *system breaks down at t=2m30s :* the first microbatch > execution was completed at the=2m15s but there is no checkpoint information > about this microbatch since the first checkpoint will happen at t=4m. > Consequently, when the streaming app restarts it will re-execute by > fetching the offsets from Kafka. > > *Sub case 2 :* *system breaks down at t=5m :* The second microbatch was > already completed in 15 secs i.e. t=4m15s which means at t=5 there should > ideally be no incomplete batches. When I restart my application, the > streaming application finds the second microbatch as incomplete from the > checkpoint made at t=4m, and re-executes that microbatch. > > > Is my understanding right? If yes, then isn't my checkpoint interval > incorrectly set resulting in duplicate processing in both the cases above? > If yes, then how do I choose an appropriate checkpoint interval? > > Regards, > Sheel >
Appropriate checkpoint interval in a spark streaming application
Hello, I am trying to figure an appropriate checkpoint interval for my spark streaming application. Its Spark Kafka integration based on Direct Streams. If my *micro batch interval is 2 mins*, and let's say *each microbatch takes only 15 secs to process* then shouldn't my checkpoint interval also be exactly 2 mins? Assuming my spark streaming application starts at t=0, following will be the state of my checkpoint: *Case 1: checkpoint interval is less than microbatch interval* If I keep my *checkpoint interval at say 1 minute *then: *t=1m: *no incomplete batches in this checkpoint *t=2m*: first microbatch is included as an incomplete microbatch in the checkpoint and microbatch execution then begins *t=3m*: no incomplete batches in the checkpoint as the first microbatch is finished processing in just 15 secs *t=4m*: second microbatch is included as an incomplete microbatch in the checkpoint and microbatch execution then begins *t=4m30s*: system breaks down; on restarting the streaming application finds the checkpoint at t=4 with the second microbatch as the incomplete microbatch and processes it again. But what's the point of reprocessing it again since the second microbatch's processing was completed at the=4m15s *Case 2: checkpoint interval is more than microbatch interval* If I keep my *checkpoint interval at say 4 minutes* then: *t=2m* first microbatch execution begins *t=4m* first checkpoint with second microbatch included as the only incomplete batch; second microbatch processing begins *Sub case 1 :* *system breaks down at t=2m30s :* the first microbatch execution was completed at the=2m15s but there is no checkpoint information about this microbatch since the first checkpoint will happen at t=4m. Consequently, when the streaming app restarts it will re-execute by fetching the offsets from Kafka. *Sub case 2 :* *system breaks down at t=5m :* The second microbatch was already completed in 15 secs i.e. t=4m15s which means at t=5 there should ideally be no incomplete batches. When I restart my application, the streaming application finds the second microbatch as incomplete from the checkpoint made at t=4m, and re-executes that microbatch. Is my understanding right? If yes, then isn't my checkpoint interval incorrectly set resulting in duplicate processing in both the cases above? If yes, then how do I choose an appropriate checkpoint interval? Regards, Sheel
[Apache Spark][Streaming Job][Checkpoint]Spark job failed on Checkpoint recovery with Batch not found error
Hi, We have a Spark 2.4 job failed on Checkpoint recovery every few hours with the following errors (from the Driver Log): driver spark-kubernetes-driver ERROR 20:38:51 ERROR MicroBatchExecution: Query impressionUpdate [id = 54614900-4145-4d60-8156-9746ffc13d1f, runId = 3637c2f3-49b6-40c2-b6d0-7edb28361c5d] terminated with error java.lang.IllegalStateException: batch 946 doesn't exist at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:406) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) And the executor logs show this error: ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM How should I fix this? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark on k8s - can driver and executor have separate checkpoint location?
Hello, I am wondering if you do so, then all your executor pods should run on the same kubernetes worker node since you mount a single volume with a ReadWriteOnce policy. By design this seems not to be good I assume. You may need to have a kind of ReadWriteMany policy associated to the volume. Then have pod anti-affinity to make sure they are not running on the same node. You may achieve this by running an NFS fiesystem and then create a PV/PVC that mounts to that shared file system. The persistentVolumeClaim defined in your Yaml should call the PVC you created. Best regards, Ali Gouta. On Sat, May 16, 2020 at 6:06 AM wzhan wrote: > Hi guys, > > I'm running spark applications on kubernetes. According to spark > documentation > > https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing > Spark needs distributed file system to store its checkpoint data so that in > case of failure, it can recover from checkpoint directory. > > My question is, can driver and executor have separate checkpoint location? > I'm asking this because driver and executor might be deployed on different > nodes. A shared checkpoint location will require ReadWriteMany access mode. > Since I only have a storage class that supports ReadWriteOnce access mode > I'm trying to find some workaround. > > > In Spark Streaming Guide, it mentioned "Failed driver can be restarted from > checkpoint information" and when executor failed, "Tasks and receivers > restarted by Spark automatically, no config needed". > > Given this I tried only config checkpoint location for driver pod. It > immediately failed with below exception: > > 2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id > = > b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId = > f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR > org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query > baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId = > f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error > org.apache.spark.SparkException: Writing job aborted. > at > > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > ... > at > > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost > task > 0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1): > java.io.IOException: mkdir of > file:/opt/window-data/baseline_windower/state/0/0 failed > > > So I tried giving separate checkpoint location to driver and executor: > > In spark application helm chart I have a checkpointlocation configuration: > > spec: >sparkConf: > "spark.sql.streaming.checkpointLocation": > "file:///opt/checkpoint-data" > > I created two checkpoint pvc and mount the volume for driver and executor > pod: > > volumes: > - name: checkpoint-driver-volume > persistentVolumeClaim: > claimName: checkpoint-driver-pvc > - name: checkpoint-executor-volume > persistentVolumeClaim: > claimName: checkpoint-executor-pvc > > driver: > volumeMounts: > - name: checkpoint-driver-volume > mountPath: "/opt/checkpoint-data" > ... > executor: > volumeMounts: > - name: checkpoint-executor-volume > mountPath: "/opt/checkpoint-data" > > After deployment it seems to be working. I tried restarted driver pod and > it > did recover from checkpoint directory. But I'm not sure if this is actually > supported by design. > > Thanks, > wzhan > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
spark on k8s - can driver and executor have separate checkpoint location?
Hi guys, I'm running spark applications on kubernetes. According to spark documentation https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Spark needs distributed file system to store its checkpoint data so that in case of failure, it can recover from checkpoint directory. My question is, can driver and executor have separate checkpoint location? I'm asking this because driver and executor might be deployed on different nodes. A shared checkpoint location will require ReadWriteMany access mode. Since I only have a storage class that supports ReadWriteOnce access mode I'm trying to find some workaround. In Spark Streaming Guide, it mentioned "Failed driver can be restarted from checkpoint information" and when executor failed, "Tasks and receivers restarted by Spark automatically, no config needed". Given this I tried only config checkpoint location for driver pod. It immediately failed with below exception: 2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId = f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId = f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ... at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1): java.io.IOException: mkdir of file:/opt/window-data/baseline_windower/state/0/0 failed So I tried giving separate checkpoint location to driver and executor: In spark application helm chart I have a checkpointlocation configuration: spec: sparkConf: "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data" I created two checkpoint pvc and mount the volume for driver and executor pod: volumes: - name: checkpoint-driver-volume persistentVolumeClaim: claimName: checkpoint-driver-pvc - name: checkpoint-executor-volume persistentVolumeClaim: claimName: checkpoint-executor-pvc driver: volumeMounts: - name: checkpoint-driver-volume mountPath: "/opt/checkpoint-data" ... executor: volumeMounts: - name: checkpoint-executor-volume mountPath: "/opt/checkpoint-data" After deployment it seems to be working. I tried restarted driver pod and it did recover from checkpoint directory. But I'm not sure if this is actually supported by design. Thanks, wzhan -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [spark streaming] checkpoint location feature for batch processing
Replied inline: On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson wrote: > Thank you, so that would mean spark gets the current latest offset(s) when > the trigger fires and then process all available messages in the topic upto > and including that offset as long as maxOffsetsPerTrigger is the default of > None (or large enought to handle all available messages). > Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still affecting even Trigger.Once is used I guess. > > I think the word micro-batch confused me (more like mega-batch in some > cases). It makes sense though, this makes Trigger.Once a fixed interval > trigger that's only fired once and not repeatedly. > "micro" is relative - though Spark by default processes all available inputs per batch, in most cases you'll want to make the batch size (interval) as small as possible, as it defines the latency of the output. Trigger.Once is an unusual case in streaming workload - that's more alike continuous execution of "batch". I refer "continuous" as picking up latest context which is the characteristic of streaming query, hence hybrid one. > > > On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim > wrote: > >> If I understand correctly, Trigger.once executes only one micro-batch and >> terminates, that's all. Your understanding of structured streaming applies >> there as well. >> >> It's like a hybrid approach as bringing incremental processing from >> micro-batch but having processing interval as batch. That said, while it >> enables to get both sides of benefits, it's basically structured streaming, >> inheriting all the limitations on the structured streaming, compared to the >> batch query. >> >> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) - >> Trigger.once will "ignore" the read limit per micro-batch on data source >> (like maxOffsetsPerTrigger) and process all available input as possible. >> (Data sources should migrate to the new API to take effect, but works for >> built-in data sources like file and Kafka.) >> >> 1. https://issues.apache.org/jira/browse/SPARK-30669 >> >> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성: >> >>> I've always had a question about Trigger.Once that I never got around to >>> ask or test for myself. If you have a 24/7 stream to a Kafka topic. >>> >>> Will Trigger.Once get the last offset(s) when it starts and then quit >>> once it hits this offset(s) or will the job run until no new messages is >>> added to the topic for a particular amount of time? >>> >>> br, >>> >>> Magnus >>> >>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: >>> >>>> Hi Rishi, >>>> >>>> That is exactly why Trigger.Once was created for Structured Streaming. >>>> The way we look at streaming is that it doesn't have to be always real >>>> time, or 24-7 always on. We see streaming as a workflow that you have to >>>> repeat indefinitely. See this blog post for more details! >>>> >>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html >>>> >>>> Best, >>>> Burak >>>> >>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I recently started playing with spark streaming, and checkpoint >>>>> location feature looks very promising. I wonder if anyone has an opinion >>>>> about using spark streaming with checkpoint location option as a slow >>>>> batch >>>>> processing solution. What would be the pros and cons of utilizing >>>>> streaming >>>>> with checkpoint location feature to achieve fault tolerance in batch >>>>> processing application? >>>>> >>>>> -- >>>>> Regards, >>>>> >>>>> Rishi Shah >>>>> >>>>
Re: [spark streaming] checkpoint location feature for batch processing
Thank you, so that would mean spark gets the current latest offset(s) when the trigger fires and then process all available messages in the topic upto and including that offset as long as maxOffsetsPerTrigger is the default of None (or large enought to handle all available messages). I think the word micro-batch confused me (more like mega-batch in some cases). It makes sense though, this makes Trigger.Once a fixed interval trigger that's only fired once and not repeatedly. On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim wrote: > If I understand correctly, Trigger.once executes only one micro-batch and > terminates, that's all. Your understanding of structured streaming applies > there as well. > > It's like a hybrid approach as bringing incremental processing from > micro-batch but having processing interval as batch. That said, while it > enables to get both sides of benefits, it's basically structured streaming, > inheriting all the limitations on the structured streaming, compared to the > batch query. > > Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) - > Trigger.once will "ignore" the read limit per micro-batch on data source > (like maxOffsetsPerTrigger) and process all available input as possible. > (Data sources should migrate to the new API to take effect, but works for > built-in data sources like file and Kafka.) > > 1. https://issues.apache.org/jira/browse/SPARK-30669 > > 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성: > >> I've always had a question about Trigger.Once that I never got around to >> ask or test for myself. If you have a 24/7 stream to a Kafka topic. >> >> Will Trigger.Once get the last offset(s) when it starts and then quit >> once it hits this offset(s) or will the job run until no new messages is >> added to the topic for a particular amount of time? >> >> br, >> >> Magnus >> >> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: >> >>> Hi Rishi, >>> >>> That is exactly why Trigger.Once was created for Structured Streaming. >>> The way we look at streaming is that it doesn't have to be always real >>> time, or 24-7 always on. We see streaming as a workflow that you have to >>> repeat indefinitely. See this blog post for more details! >>> >>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html >>> >>> Best, >>> Burak >>> >>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah >>> wrote: >>> >>>> Hi All, >>>> >>>> I recently started playing with spark streaming, and checkpoint >>>> location feature looks very promising. I wonder if anyone has an opinion >>>> about using spark streaming with checkpoint location option as a slow batch >>>> processing solution. What would be the pros and cons of utilizing streaming >>>> with checkpoint location feature to achieve fault tolerance in batch >>>> processing application? >>>> >>>> -- >>>> Regards, >>>> >>>> Rishi Shah >>>> >>>
Re: [spark streaming] checkpoint location feature for batch processing
If I understand correctly, Trigger.once executes only one micro-batch and terminates, that's all. Your understanding of structured streaming applies there as well. It's like a hybrid approach as bringing incremental processing from micro-batch but having processing interval as batch. That said, while it enables to get both sides of benefits, it's basically structured streaming, inheriting all the limitations on the structured streaming, compared to the batch query. Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) - Trigger.once will "ignore" the read limit per micro-batch on data source (like maxOffsetsPerTrigger) and process all available input as possible. (Data sources should migrate to the new API to take effect, but works for built-in data sources like file and Kafka.) 1. https://issues.apache.org/jira/browse/SPARK-30669 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성: > I've always had a question about Trigger.Once that I never got around to > ask or test for myself. If you have a 24/7 stream to a Kafka topic. > > Will Trigger.Once get the last offset(s) when it starts and then quit once > it hits this offset(s) or will the job run until no new messages is added > to the topic for a particular amount of time? > > br, > > Magnus > > On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: > >> Hi Rishi, >> >> That is exactly why Trigger.Once was created for Structured Streaming. >> The way we look at streaming is that it doesn't have to be always real >> time, or 24-7 always on. We see streaming as a workflow that you have to >> repeat indefinitely. See this blog post for more details! >> >> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html >> >> Best, >> Burak >> >> On Fri, May 1, 2020 at 2:55 PM Rishi Shah >> wrote: >> >>> Hi All, >>> >>> I recently started playing with spark streaming, and checkpoint location >>> feature looks very promising. I wonder if anyone has an opinion about using >>> spark streaming with checkpoint location option as a slow batch processing >>> solution. What would be the pros and cons of utilizing streaming with >>> checkpoint location feature to achieve fault tolerance in batch processing >>> application? >>> >>> -- >>> Regards, >>> >>> Rishi Shah >>> >>
Re: [spark streaming] checkpoint location feature for batch processing
I've always had a question about Trigger.Once that I never got around to ask or test for myself. If you have a 24/7 stream to a Kafka topic. Will Trigger.Once get the last offset(s) when it starts and then quit once it hits this offset(s) or will the job run until no new messages is added to the topic for a particular amount of time? br, Magnus On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: > Hi Rishi, > > That is exactly why Trigger.Once was created for Structured Streaming. The > way we look at streaming is that it doesn't have to be always real time, or > 24-7 always on. We see streaming as a workflow that you have to repeat > indefinitely. See this blog post for more details! > > https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html > > Best, > Burak > > On Fri, May 1, 2020 at 2:55 PM Rishi Shah > wrote: > >> Hi All, >> >> I recently started playing with spark streaming, and checkpoint location >> feature looks very promising. I wonder if anyone has an opinion about using >> spark streaming with checkpoint location option as a slow batch processing >> solution. What would be the pros and cons of utilizing streaming with >> checkpoint location feature to achieve fault tolerance in batch processing >> application? >> >> -- >> Regards, >> >> Rishi Shah >> >
Re: [spark streaming] checkpoint location feature for batch processing
Thanks Burak! Appreciate it. This makes sense. How do you suggest we make sure resulting data doesn't produce tiny files? If we are not on databricks yet and can not leverage delta lake features? Also checkpointing feature, do you have active blog/article I can take a look at to try out an example? On Fri, May 1, 2020 at 7:22 PM Burak Yavuz wrote: > Hi Rishi, > > That is exactly why Trigger.Once was created for Structured Streaming. The > way we look at streaming is that it doesn't have to be always real time, or > 24-7 always on. We see streaming as a workflow that you have to repeat > indefinitely. See this blog post for more details! > > https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html > > Best, > Burak > > On Fri, May 1, 2020 at 2:55 PM Rishi Shah > wrote: > >> Hi All, >> >> I recently started playing with spark streaming, and checkpoint location >> feature looks very promising. I wonder if anyone has an opinion about using >> spark streaming with checkpoint location option as a slow batch processing >> solution. What would be the pros and cons of utilizing streaming with >> checkpoint location feature to achieve fault tolerance in batch processing >> application? >> >> -- >> Regards, >> >> Rishi Shah >> > -- Regards, Rishi Shah
Re: [spark streaming] checkpoint location feature for batch processing
Hi Rishi, That is exactly why Trigger.Once was created for Structured Streaming. The way we look at streaming is that it doesn't have to be always real time, or 24-7 always on. We see streaming as a workflow that you have to repeat indefinitely. See this blog post for more details! https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html Best, Burak On Fri, May 1, 2020 at 2:55 PM Rishi Shah wrote: > Hi All, > > I recently started playing with spark streaming, and checkpoint location > feature looks very promising. I wonder if anyone has an opinion about using > spark streaming with checkpoint location option as a slow batch processing > solution. What would be the pros and cons of utilizing streaming with > checkpoint location feature to achieve fault tolerance in batch processing > application? > > -- > Regards, > > Rishi Shah >
[spark streaming] checkpoint location feature for batch processing
Hi All, I recently started playing with spark streaming, and checkpoint location feature looks very promising. I wonder if anyone has an opinion about using spark streaming with checkpoint location option as a slow batch processing solution. What would be the pros and cons of utilizing streaming with checkpoint location feature to achieve fault tolerance in batch processing application? -- Regards, Rishi Shah
Re: [Structured Streaming] Checkpoint file compact file grows big
Deleting the latest .compact file would lose the ability for exactly-once and lead Spark fail to read from the output directory. If you're reading the output directory from non-Spark then metadata on output directory doesn't matter, but there's no exactly-once (exactly-once is achieved leveraging the metadata, which only Spark can read). Btw, what you've encountered is the one of known issues on file stream sink - there're two different JIRA issues filed for the same issue so far (reported from end users): https://issues.apache.org/jira/browse/SPARK-24295 https://issues.apache.org/jira/browse/SPARK-29995 I've proposed the retention of output files in file stream sink but haven't got some love. (That means it's not guaranteed to be addressed) https://issues.apache.org/jira/browse/SPARK-27188 Given the patch is stale, I'm planning to rework based on latest master again sooner. Btw, I've also proposed other improvements to help addressing latency issues in file stream source & file stream sink but haven't got some love from committers as well (no guarantee to be addressed) https://issues.apache.org/jira/browse/SPARK-30804 https://issues.apache.org/jira/browse/SPARK-30866 https://issues.apache.org/jira/browse/SPARK-30900 https://issues.apache.org/jira/browse/SPARK-30915 https://issues.apache.org/jira/browse/SPARK-30946 SPARK-30946 is closely related to the issue - it will help the size of checkpoint file much smaller and also much shorter elapsed time to compact. Efficiency would depend on compression ratio, but it could achieve 5 times faster to compact and 80% smaller (1/5 of original) which would delay the point of time greatly even without TTL. Say, if you reached the bad state in 2 weeks, the patch would make it delayed by 8 weeks more (10 weeks to reach the bad state). That said, it doesn't completely get rid of necessity of TTL, but open the chance to have longer TTL without encountering bad state. If you're adventurous you can apply these patches on your version of Spark and see whether it helps. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Apr 16, 2020 at 9:24 AM Ahn, Daniel wrote: > Are Spark Structured Streaming checkpoint files expected to grow over time > indefinitely? Is there a recommended way to safely age-off old checkpoint > data? > > > > Currently we have a Spark Structured Streaming process reading from Kafka > and writing to an HDFS sink, with checkpointing enabled and writing to a > location on HDFS. This streaming application has been running for 4 months > and over time we have noticed that with every 10th job within the > application there is about a 5 minute delay between when a job finishes and > the next job starts which we have attributed to the checkpoint compaction > process. At this point the .compact file that is written is about 2GB in > size and the contents of the file show it keeps track of files it processed > at the very origin of the streaming application. > > > > This issue can be reproduced with any Spark Structured Streaming process > that writes checkpoint files. > > > > Is the best approach for handling the growth of these files to simply > delete the latest .compact file within the checkpoint directory, and are > there any associated risks with doing so? > > > > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibited. If you have received this e-mail in error, please notify the > sender by replying to this message and delete this e-mail immediately. >
Re:[Structured Streaming] Checkpoint file compact file grows big
SEE:http://spark.apache.org/docs/2.3.1/streaming-programming-guide.html#checkpointing "Note that checkpointing of RDDs incurs the cost of saving to reliable storage. This may cause an increase in the processing time of those batches where RDDs get checkpointed." As far as I know, the official documentation states that the checkpoint of the spark streaming application will continue to increase over time. Whereas data or RDD checkpointing is necessary even for basic functioning if stateful transformations are used. So,for applications that require long-term aggregation, I choose to use third-party caches in production, such as redis. Maybe you can try Alluxio Wishes! 在 2020-04-16 08:19:24,"Ahn, Daniel" 写道: Are Spark Structured Streaming checkpoint files expected to grow over time indefinitely? Is there a recommended way to safely age-off old checkpoint data? Currently we have a Spark Structured Streaming process reading from Kafka and writing to an HDFS sink, with checkpointing enabled and writing to a location on HDFS. This streaming application has been running for 4 months and over time we have noticed that with every 10th job within the application there is about a 5 minute delay between when a job finishes and the next job starts which we have attributed to the checkpoint compaction process. At this point the .compact file that is written is about 2GB in size and the contents of the file show it keeps track of files it processed at the very origin of the streaming application. This issue can be reproduced with any Spark Structured Streaming process that writes checkpoint files. Is the best approach for handling the growth of these files to simply delete the latest .compact file within the checkpoint directory, and are there any associated risks with doing so? This e-mail, including attachments, may include confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. If the reader of this e-mail is not the intended recipient or his or her authorized agent, the reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.
[Structured Streaming] Checkpoint file compact file grows big
Are Spark Structured Streaming checkpoint files expected to grow over time indefinitely? Is there a recommended way to safely age-off old checkpoint data? Currently we have a Spark Structured Streaming process reading from Kafka and writing to an HDFS sink, with checkpointing enabled and writing to a location on HDFS. This streaming application has been running for 4 months and over time we have noticed that with every 10th job within the application there is about a 5 minute delay between when a job finishes and the next job starts which we have attributed to the checkpoint compaction process. At this point the .compact file that is written is about 2GB in size and the contents of the file show it keeps track of files it processed at the very origin of the streaming application. This issue can be reproduced with any Spark Structured Streaming process that writes checkpoint files. Is the best approach for handling the growth of these files to simply delete the latest .compact file within the checkpoint directory, and are there any associated risks with doing so? This e-mail, including attachments, may include confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. If the reader of this e-mail is not the intended recipient or his or her authorized agent, the reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.
[Spark MicroBatchExecution] Error fetching kafka/checkpoint/state/0/0/1.delta does not exist
Hi community, I'm having this error in some kafka streams: Caused by: java.io.FileNotFoundException: File file:/efs/.../kafka/checkpoint/state/0/0/1.delta does not exist Because of this I have some streams down. How can I fix this? Thank you. -- Miguel Silvestre
Spark checkpoint problem for python api
Hi, My code is below: from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils def test(record_list): print(list(record_list)) return record_list def functionToCreateContext(): conf = SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077") \ .set("spark.executor.memory", '6g') \ .set("spark.executor.cores", '8') \ .set("spark.deploy.defaultCores", '8') \ .set("spark.cores.max", '16') \ .set("spark.streaming.kafka.maxRatePerPartition", 1) \ .set("spark.streaming.blockInterval", 1) \ .set("spark.default.parallelism", 8) \ .set("spark.driver.host", '172.22.9.181') \ sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 5) ssc.checkpoint("/spark/checkpoints/model_event_spark") return ssc if __name__ == '__main__': ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark", functionToCreateContext) record_dstream = KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"], kafkaParams={"bootstrap.servers":"xxx:9092", "auto.offset.reset":"smallest", }, ) record_dstream.checkpoint(5).mapPartitions(test).pprint() ssc.start() ssc.awaitTermination() When the scripts starts at the first time,it work well. But second time started from checkpointDirectory,it has problem like: 2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invo
Checkpointing and accessing the checkpoint data
Hi Sparkians, Few questions around checkpointing. 1. Checkpointing “dump” file / persisting to disk Is the file encrypted or is it a standard parquet file? 2. If the file is not encrypted, can I use it with another app (I know it’s kind of of a weird stretch case) 3. Have you/do you know of any performance comparison between the two? On small datasets, caching seems more performant, but I can imagine that there is a sweet spot… Thanks! jgp Jean -Georges Perrin j...@jgp.net
Spark Streaming: Checkpoint, Recovery and Idempotency
Hello, I am trying to understand the content of a checkpoint and corresponding recovery. *My understanding of Spark Checkpointing: * If you have really long DAGs and your spark cluster fails, checkpointing helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50 transformations can be reduced to 4-5 transformations with the help of checkpointing. It breaks the DAG though. *Checkpointing in Streaming * My Spark Streaming job has a microbatch of 5 seconds. As I understand, a new job is submitted every 5 secs on the Eventloop that invokes the JobGenerator to generate the RDD DAG for the new microbatch from the DStreamGraph, while the receiver in the meantime keeps collecting the data for the next new microbatch for the next cycle. If I enable checkpointing, as I understand, it will periodically keep checkpointing the "current state". *Question: * What is this "state"? Is this the combination of the base RDD and the state of the operators/transformations of the DAG for the present microbatch only? So I have the following: /ubatch 0 at T=0 > SUCCESS ubatch 1 at T=5 > SUCCESS ubatch 2 at T=10 ---> SUCCESS > Checkpointing kicks in now at T=12 ubatch 3 at T=15 ---> SUCCESS ubatch 4 at T=20 > Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!! ... > Spark Cluster is restarted at *T=100*/ What specifically goes and sits on the disk as a result of checkpointing at T=12? Will it just store the present state of operators of the DAG for ubatch 2? a. If yes, then during recovery at T=100, the last checkpoint available is at T=12. What happens to the ubatch 3 at T=15 which was already processed successfully. Does the application reprocess ubatch 3 and handle idempotency here? If yes, do we go to the streaming source e.g. Kafka and rewind the offset to be able to replay the contents starting from the ubatch 3? b. If no, then what exactly goes into the checkpoint directory at T=12? https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency Regards -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
PySpark Streaming “PicklingError: Could not serialize object” when use transform operator and checkpoint enabled
In PySpark streaming, if checkpoint enabled, and if use a stream.transform operator to join with another rdd, “PicklingError: Could not serialize object” will be thrown. I have asked the same question at stackoverflow: https://stackoverflow.com/questions/56267591/pyspark-streaming-picklingerror-could-not-serialize-object-when-checkpoint-an After some investigation, I found the problem is due to checkpoint will serialize lambda and then serialize the rdd in lambda. So I change the code to something like below, the purpose is to use a static transient variable to avoid serialize rdd. class DocInfoHolder: doc_info = None line.transform(lambda rdd:rdd.join(DocInfoHolder.doc_info)).pprint(10) But problem exist still. Then I found pyspark use a special pickle called cloudpickle.py, looks like it will serialize any reference class, function, lambda code, and there is no document about how to skip serialize. Could anyone help, how to walk around this issue. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark checkpoint between 2 jobs and HDFS ramfs with storage policy
Hi, I am looking for a setup that would be to be able to split a single spark processing into 2 jobs (operational constraints) without wasting too much time persisting the data between the two jobs during spark checkpoint/writes. I have a config with a lot of ram and I'm willing to configure a a few hundreds GB in ramfs, but I cannot find any feedbacks on these kind of configurations... and the doc hadoop that tells me "network replication negates the benefits of writing to memory" doesn't inspire me much confidence regarding performance improvement. My HDFS is configured with replication 3, so if the LAZY_PERSIST writes still imply waiting for the three replica to be written in the ramfs of three different datanodes, I can understand that the performance improvement will be really small. In addition if my second job is not provisioned on the same datanodes, the ramfs might not be of any help. Any advice regarding the use of HDFS ramfs and Lazy persist with spark checkpoint ? Deadend ? larger linux pagecache would be more helpful ? Regards, JL
Redeploying spark streaming application aborts because of checkpoint issue
Hello all, Am using spark-2.3.0 and hadoop-2.7.4. I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client. Read events from Kafka as shown below; m_oKafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "10") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events")) .select("events.*"); Checkpoint is used as shown below; DataStreamWriter oMilestoneStream = oAggregation .writeStream() .queryName(strQueryName) .outputMode("update") .trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval())) .option("checkpointLocation", strCheckpointLocation) .foreach(oForeachWriter); strCheckpointLocation is something like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj. This is hdfs location. With this when I redeploy the spark I get below said exception. The only work around I have currently is to delete the checkpoint location and recreate the topic. I also see couple of JIRA tasks which says RESOLVED but the problem still seen. https://issues.apache.org/jira/browse/SPARK-20894 https://issues.apache.org/jira/browse/SPARK-22262 Can someone help me on what is the best solution for this? thanks, Robin Kuttaiah Exception --- 18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalStateException: Error reading delta file hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta of HDFSStateStoreProvider[id = (op=1,part=0),dir = hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]: hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org $apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196) at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) Caused by: java.io.FileNotFoundException: File does not exist: /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta at org.apache.hadoop.hdfs.server.nam
Re: Structured Streaming doesn't write checkpoint log when I use coalesce
Which version do you use? Above app works with Spark 2.3.1, 200 partitions are stored for State. val queryStatusFile = conf.queryStatusFile() val rateRowPerSecond = conf.rateRowPerSecond() val rateRampUpTimeSecond = conf.rateRampUpTimeSecond() val ss = SparkSession .builder() .master("local[3]") .appName("state coalesce test") .getOrCreate() ss.streams.addListener(new QueryListenerWriteProgressToFile(queryStatusFile)) import ss.implicits._ val df = ss.readStream .format("rate") .option("rowsPerSecond", rateRowPerSecond) .option("rampUpTime", s"${rateRampUpTimeSecond}s") .load() df.printSchema() val outDf = df.withWatermark("timestamp", "10 seconds") .selectExpr( "timestamp", "mod(value, 100) as mod", "value", BenchmarkQueryHelper.createCaseExprStr( "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word") .groupBy( window($"timestamp", "1 minute", "10 seconds"), $"mod", $"word") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) .coalesce(8) val query = outDf.writeStream .format("memory") .option("queryName", "stateCoalesceTest") .option("checkpointLocation", "/tmp/state-coalesce-test") .trigger(Trigger.ProcessingTime("5 seconds")) .outputMode(OutputMode.Update()) .start() query.awaitTermination() -Jungtaek Lim (HeartSaVioR) 2018년 8월 9일 (목) 오후 8:38, WangXiaolong 님이 작성: > Hi, > >Lately, I encountered a problem, when I was writing as structured > streaming job to write things into opentsdb. > The write-stream part looks something like > > outputDs > .coalesce(14) > .writeStream > .outputMode("append") > .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds")) > .option("checkpointLocation",s"$checkpointDir/$appName/tsdb") > .foreach { > TsdbWriter( > tsdbUrl, > MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, > mongoDatabase, mongoCollection,mongoAuthenticationDatabase) > )(createMetricBuilder(tsdbMetricPrefix)) > } > .start() > > And when I check the checkpoint dir, I discover that the > "/checkpoint/state" dir is empty. I looked into the executor's log and > found that the HDFSBackedStateStoreProvider didn't write anything on the > checkpoint dir. > >Strange thing is, when I replace the "coalesce" function into > "repartition" function, the problem solved. Is there a difference between > these two functions when using structured streaming? > > Looking forward to you help, thanks. > > > > >
Structured Streaming doesn't write checkpoint log when I use coalesce
Hi, Lately, I encountered a problem, when I was writing as structured streaming job to write things into opentsdb. The write-stream part looks something like outputDs .coalesce(14) .writeStream .outputMode("append") .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds")) .option("checkpointLocation",s"$checkpointDir/$appName/tsdb") .foreach { TsdbWriter( tsdbUrl, MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, mongoDatabase, mongoCollection,mongoAuthenticationDatabase) )(createMetricBuilder(tsdbMetricPrefix)) } .start() And when I check the checkpoint dir, I discover that the "/checkpoint/state" dir is empty. I looked into the executor's log and found that the HDFSBackedStateStoreProvider didn't write anything on the checkpoint dir. Strange thing is, when I replace the "coalesce" function into "repartition" function, the problem solved. Is there a difference between these two functions when using structured streaming? Looking forward to you help, thanks.
Re: [Structured Streaming] Reading Checkpoint data
thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Structured Streaming] Reading Checkpoint data
Only the stream metadata (e.g., streamid, offsets) are stored as json. The stream state data is stored in an internal binary format. On Mon, Jul 9, 2018 at 4:07 PM, subramgr wrote: > Hi, > > I read somewhere that with Structured Streaming all the checkpoint data is > more readable (Json) like. Is there any documentation on how to read the > checkpoint data. > > If I do `hadoop fs -ls` on the `state` directory I get some encoded data. > > Thanks > Girish > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[Structured Streaming] Reading Checkpoint data
Hi, I read somewhere that with Structured Streaming all the checkpoint data is more readable (Json) like. Is there any documentation on how to read the checkpoint data. If I do `hadoop fs -ls` on the `state` directory I get some encoded data. Thanks Girish -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
making query state checkpoint compatible in structured streaming
Consider there is a spark query(A) which is dependent on Kafka topics t1 and t2. After running this query in the streaming mode, a checkpoint(C1) directory for the query gets created with offsets and sources directories. Now I add a third topic(t3) on which the query is dependent. Now if I restart spark with the same checkpoint C1, Spark crashes as expected, as it could not find the entry for the third topic(t3). So just as part of a hack, I tried to add the topic t3 to the checkpoint manually to the sources and offset directories of the query in the checkpoint. But spark still crashed. Whats the correct way to solve this problem? How to handle such upgrade paths in structured streaming? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint
If you are using kafka direct connect api it might be committing offset back to kafka itself בתאריך יום ה׳, 7 ביוני 2018, 4:10, מאת licl : > I met the same issue and I have try to delete the checkpoint dir before the > job , > > But spark seems can read the correct offset even though after the > checkpoint dir is deleted , > > I don't know how spark do this without checkpoint's metadata. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint
I met the same issue and I have try to delete the checkpoint dir before the job , But spark seems can read the correct offset even though after the checkpoint dir is deleted , I don't know how spark do this without checkpoint's metadata. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Using checkpoint much, much faster than cache. Why?
Hi, folks. I am using Spark 2.2.0 and a combination of Spark ML's LinearSVC and OneVsRest to classify some documents that are originally read from HDFS using sc.wholeTextFiles. When I use it on small documents, I get the results in a few minutes. When I use it on the same number of large documents, it takes hours. NOTE! I munge every documents to a fixed length vector which is the same size irrespective of the size of the document. Using jstat, I see all my executor threads in serialization code even though all the data easily fits into the memory of the cluster ("Fraction cached: 100%" everywhere). I have called cache() on all my DataFrames with no effect. However, calling checkpoint() on the DF fed to Spark's ML code solved the problem. So, although the problem is fixed, I'd like to know why cache() did not work when checkpoint() did. Can anybody explain? Thanks, Phill
Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint
Hi: I am working on a realtime application using spark structured streaming (v 2.2.1). The application reads data from kafka and if there is a failure, I would like to ignore the checkpoint. Is there any configuration to just read from last kafka offset after a failure and ignore any offset checkpoints ? Also, I believe that the checkpoint also saves state and will continue to aggregations after recovery. Is there any way to ignore checkpointed state ? Also, is there a way to selectively save state or offset checkpoint only ? Thanks
The last successful batch before stop re-execute after restart the DStreams with checkpoint
Experts, I see the last batch before stop (graceful shutdown) always re-execute after restart the DStream from a checkpoint, is this a expected behavior? I see a bug in JIRA: https://issues.apache.org/jira/browse/SPARK-20050, whic reports duplicates on Kafka, I also see this with HDFS file. Regards - Terry
Issue with EFS checkpoint
Hello, We have a Spark cluster with 3 worker nodes available as EC2 on AWS. Spark application is running in cluster mode and the checkpoints are stored in EFS. Spark version used is 2.2.0. We noticed the below error coming up – our understanding was that this intermittent checkpoint issue will be resolved with EFS once we moved away from S3. Caused by: java.io.FileNotFoundException: File file:/efs/checkpoint/UPDATE_XXX/offsets/.3ff13bc6-3eeb-4b87-be87-5d1106efcd62.tmp does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) Please help me understand the issue and let me know if there is any fix available for this. Regards, Rehman The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark Streaming checkpoint
Hi, I have written spark streaming job to use the checkpoint. I have stopped the streaming job for 5 days and then restart it today. I have encountered weird issue where it shows as zero records for all cycles till date. is it causing data loss? [image: Inline image 1] Thanks, Asmath
Null pointer exception in checkpoint directory
Hi, I keep getting null pointer exception in the spark streaming job with checkpointing. any suggestions to resolve this. Exception in thread "pool-22-thread-9" java.lang.NullPointerException at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread "pool-22-thread-10" java.lang.NullPointerException at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread "pool-22-thread-11" java.lang.NullPointerException at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread "pool-22-thread-12" java.lang.NullPointerException at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks, Asmath
Re: Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits
The root cause is probably that HDFSMetadataLog ignores exceptions thrown by "output.close". I think this should be fixed by this line in Spark 2.2.1 and 3.0.0: https://github.com/apache/spark/commit/6edfff055caea81dc3a98a6b4081313a0c0b0729#diff-aaeb546880508bb771df502318c40a99L126 Could you try 2.2.1? On Thu, Jan 4, 2018 at 9:08 AM, William Briggs wrote: > I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The > job sources data from a Kafka topic, performs a variety of filters and > transformations, and sinks data back into a different Kafka topic. > > Once per day, we stop the query in order to merge the namenode edit logs > with the fsimage, because Structured Streaming creates and destroys a > significant number of HDFS files, and EMR doesn't support a secondary or HA > namenode for fsimage compaction (AWS support directed us to do this, as > Namenode edit logs were filling the disk). > > Occasionally, the Structured Streaming query will not restart because the > most recent file in the "commits" or "offsets" checkpoint subdirectory is > empty. This seems like an undesirable behavior, as it requires manual > intervention to remove the empty files in order to force the job to fall > back onto the last good values. Has anyone run into this behavior? The only > similar issue I can find is SPARK-21760 > <https://issues.apache.org/jira/browse/SPARK-21760>, which appears to > have no fix or workaround. > > Any assistance would be greatly appreciated! > > Regards, > Will >
Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The job sources data from a Kafka topic, performs a variety of filters and transformations, and sinks data back into a different Kafka topic. Once per day, we stop the query in order to merge the namenode edit logs with the fsimage, because Structured Streaming creates and destroys a significant number of HDFS files, and EMR doesn't support a secondary or HA namenode for fsimage compaction (AWS support directed us to do this, as Namenode edit logs were filling the disk). Occasionally, the Structured Streaming query will not restart because the most recent file in the "commits" or "offsets" checkpoint subdirectory is empty. This seems like an undesirable behavior, as it requires manual intervention to remove the empty files in order to force the job to fall back onto the last good values. Has anyone run into this behavior? The only similar issue I can find is SPARK-21760 <https://issues.apache.org/jira/browse/SPARK-21760>, which appears to have no fix or workaround. Any assistance would be greatly appreciated! Regards, Will
Spark 2.1.2 Spark Streaming checkpoint interval not respected
Hi, In the following example using mapWithState, I set checkpoint interval to 1 minute. From the log, Spark stills write to the checkpoint directory every second. Would be appreciated if someone can point out what I have done wrong. object MapWithStateDemo { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: MapWithStateDemo ") System.exit(1) } val sparkConf = new SparkConf().setAppName("MapWithStateDemo") .setIfMissing("spark.master","local[*]") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) // Initial state RDD for mapWithState operation val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] = wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD)) stateDstream.checkpoint(Minutes(1L)) stateDstream.print() val targetDir = new File(getClass.getResource("/").toURI).getParentFile.getParentFile val checkpointDir = targetDir + "/checkpoint" ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() } } Thanks in advance for any assistance ! Shing
Re: Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy
HDFS can be r placed by other filesystem plugins (eg ignitefs, s3, etc) so the easiest is to write a file system plugin. This is not a plug-in for Spark but part of the Hadoop functionality used by Spark. > On 13. Oct 2017, at 17:41, Anand Chandrashekar wrote: > > Greetings! > > I would like to accomplish a custom kafka checkpoint strategy (instead of > hdfs, i would like to use redis). is there a strategy I can use to change > this behavior; any advise will help. Thanks! > > Regards, > Anand. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy
Greetings! I would like to accomplish a custom kafka checkpoint strategy (instead of hdfs, i would like to use redis). is there a strategy I can use to change this behavior; any advise will help. Thanks! Regards, Anand.
Re: Cases when to clear the checkpoint directories.
Any changes in the Java code (to be specific, the generated bytecode) in the functions you pass to Spark (i.e., map function, reduce function, as well as it closure dependencies) counts as "application code change", and will break the recovery from checkpoints. On Sat, Oct 7, 2017 at 11:53 AM, John, Vishal (Agoda) wrote: > > > Hello TD, > > You had replied to one of the questions about checkpointing – > > This is an unfortunate design on my part when I was building DStreams :) > > Fortunately, we learnt from our mistakes and built Structured Streaming > the correct way. Checkpointing in Structured Streaming stores only the > progress information (offsets, etc.), and the user can change their > application code (within certain constraints, of course) and still restart > from checkpoints (unlike DStreams). If you are just building out your > streaming applications, then I highly recommend you to try out Structured > Streaming instead of DStreams (which is effectively in maintenance mode). > > Can you please elaborate on what you mean by application code change in > DStream applications? > > If I add a couple of println statements in my application code will that > become an application code change? or do you mean, changing method > signatures or adding new methods etc. > Could you please point to relevant source code in Spark, which does this > type of code validation/de-serialisation in case of DStreams? > > We are using mapWithState in our application and it builds its state from > checkpointed RDDs. I would like understand the cases where we can avoid > clearing the checkpoint directories. > > > thanks in advance, > Vishal > > > > This message is confidential and is for the sole use of the intended > recipient(s). It may also be privileged or otherwise protected by copyright > or other legal rules. If you have received it by mistake please let us know > by reply email and delete it from your system. It is prohibited to copy > this message or disclose its content to anyone. Any confidentiality or > privilege is not waived or lost by any mistaken delivery or unauthorized > disclosure of the message. All messages sent to and from Agoda may be > monitored to ensure compliance with company policies, to protect the > company's interests and to remove potential malware. Electronic messages > may be intercepted, amended, lost or deleted, or contain viruses. >
Cases when to clear the checkpoint directories.
Hello TD, You had replied to one of the questions about checkpointing – This is an unfortunate design on my part when I was building DStreams :) Fortunately, we learnt from our mistakes and built Structured Streaming the correct way. Checkpointing in Structured Streaming stores only the progress information (offsets, etc.), and the user can change their application code (within certain constraints, of course) and still restart from checkpoints (unlike DStreams). If you are just building out your streaming applications, then I highly recommend you to try out Structured Streaming instead of DStreams (which is effectively in maintenance mode). Can you please elaborate on what you mean by application code change in DStream applications? If I add a couple of println statements in my application code will that become an application code change? or do you mean, changing method signatures or adding new methods etc. Could you please point to relevant source code in Spark, which does this type of code validation/de-serialisation in case of DStreams? We are using mapWithState in our application and it builds its state from checkpointed RDDs. I would like understand the cases where we can avoid clearing the checkpoint directories. thanks in advance, Vishal This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses.
ReduceByKeyAndWindow checkpoint recovery issues in Spark Streaming
Hi, ReduceByKeyAndWindow checkpoint recovery has issues when trying to recover for the second time. Basically it is losing the reduced value of the previous window but is present in the old values that needs to be inverse reduced resulting in the following error. Does anyone has any idea as to why it does not recover properly the second time? Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently? at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:143) at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:130) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKeyAndWindow-checkpoint-recovery-issues-in-Spark-Streaming-tp29100.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Fwd: Issues when trying to recover a textFileStream from checkpoint in Spark streaming
Hi, I am facing issues while trying to recover a textFileStream from checkpoint. Basically it is trying to load the files from the begining of the job start whereas I am deleting the files after processing them. I have the following configs set so was thinking that it should not look for files beyond 2 minutes when trying to recover from checkpoint. Any suggestions on this would be of great help. sparkConf.set("spark.streaming.minRememberDuration","120s") sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s") Thanks, Swetha -- Forwarded message -- From: SRK Date: Thu, Aug 10, 2017 at 5:04 PM Subject: Issues when trying to recover a textFileStream from checkpoint in Spark streaming To: user@spark.apache.org Hi, I am facing issues while trying to recover a textFileStream from checkpoint. Basically it is trying to load the files from the begining of the job start whereas I am deleting the files after processing them. I have the following configs set so was thinking that it should not look for files beyond 2 minutes when trying to recover from checkpoint. Any suggestions on this would be of great help. sparkConf.set("spark.streaming.minRememberDuration","120s") sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s") Thanks, Swetha -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Issues-when-trying-to-recover-a-textFileStream-from- checkpoint-in-Spark-streaming-tp29052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Issues when trying to recover a textFileStream from checkpoint in Spark streaming
Hi, I am facing issues while trying to recover a textFileStream from checkpoint. Basically it is trying to load the files from the begining of the job start whereas I am deleting the files after processing them. I have the following configs set so was thinking that it should not look for files beyond 2 minutes when trying to recover from checkpoint. Any suggestions on this would be of great help. sparkConf.set("spark.streaming.minRememberDuration","120s") sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s") Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-trying-to-recover-a-textFileStream-from-checkpoint-in-Spark-streaming-tp29052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Considering the @transient annotations and the work done in the instance initializer, not much state is really be broadcast to the executors. It might be simpler to just create these instances on the executors, rather than trying to broadcast them? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698p29044.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [SS] Console sink not supporting recovering from checkpoint location? Why?
Hi Michael, That reflects my sentiments so well. Thanks for having confirmed my thoughts! https://issues.apache.org/jira/browse/SPARK-21667 Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Aug 8, 2017 at 12:37 AM, Michael Armbrust wrote: > I think there is really no good reason for this limitation. > > On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski wrote: >> >> Hi, >> >> While exploring checkpointing with kafka source and console sink I've >> got the exception: >> >> // today's build from the master >> scala> spark.version >> res8: String = 2.3.0-SNAPSHOT >> >> scala> val q = records. >> | writeStream. >> | format("console"). >> | option("truncate", false). >> | option("checkpointLocation", "/tmp/checkpoint"). // <-- >> checkpoint directory >> | trigger(Trigger.ProcessingTime(10.seconds)). >> | outputMode(OutputMode.Update). >> | start >> org.apache.spark.sql.AnalysisException: This query does not support >> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to >> start over.; >> at >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222) >> at >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) >> at >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284) >> ... 61 elided >> >> The "trigger" is the change >> https://issues.apache.org/jira/browse/SPARK-16116 and this line in >> particular >> https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277. >> >> Why is this needed? I can't think of a use case where console sink >> could not recover from checkpoint location (since all the information >> is available). I'm lost on it and would appreciate some help (to >> recover :)) >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [SS] Console sink not supporting recovering from checkpoint location? Why?
I think there is really no good reason for this limitation. On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski wrote: > Hi, > > While exploring checkpointing with kafka source and console sink I've > got the exception: > > // today's build from the master > scala> spark.version > res8: String = 2.3.0-SNAPSHOT > > scala> val q = records. > | writeStream. > | format("console"). > | option("truncate", false). > | option("checkpointLocation", "/tmp/checkpoint"). // <-- > checkpoint directory > | trigger(Trigger.ProcessingTime(10.seconds)). > | outputMode(OutputMode.Update). > | start > org.apache.spark.sql.AnalysisException: This query does not support > recovering from checkpoint location. Delete /tmp/checkpoint/offsets to > start over.; > at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery( > StreamingQueryManager.scala:222) > at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery( > StreamingQueryManager.scala:278) > at org.apache.spark.sql.streaming.DataStreamWriter. > start(DataStreamWriter.scala:284) > ... 61 elided > > The "trigger" is the change > https://issues.apache.org/jira/browse/SPARK-16116 and this line in > particular https://github.com/apache/spark/pull/13817/files#diff- > d35e8fce09686073f81de598ed657de7R277. > > Why is this needed? I can't think of a use case where console sink > could not recover from checkpoint location (since all the information > is available). I'm lost on it and would appreciate some help (to > recover :)) > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[SS] Console sink not supporting recovering from checkpoint location? Why?
Hi, While exploring checkpointing with kafka source and console sink I've got the exception: // today's build from the master scala> spark.version res8: String = 2.3.0-SNAPSHOT scala> val q = records. | writeStream. | format("console"). | option("truncate", false). | option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory | trigger(Trigger.ProcessingTime(10.seconds)). | outputMode(OutputMode.Update). | start org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete /tmp/checkpoint/offsets to start over.; at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284) ... 61 elided The "trigger" is the change https://issues.apache.org/jira/browse/SPARK-16116 and this line in particular https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277. Why is this needed? I can't think of a use case where console sink could not recover from checkpoint location (since all the information is available). I'm lost on it and would appreciate some help (to recover :)) Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: underlying checkpoint
Actually, show is an action. The issue is that unless you have some aggregations, show will only go over some of the dataframe, not all of it and therefore the caching won’t occur (similar to what happens with cache). You need an action which requires to go over the entire dataframe (which count does). Thanks, Assaf. From: Bernard Jesop [mailto:bernard.je...@gmail.com] Sent: Thursday, July 13, 2017 6:58 PM To: Vadim Semenov Cc: user Subject: Re: underlying checkpoint Thank you, one of my mistakes was to think that show() was an action. 2017-07-13 17:52 GMT+02:00 Vadim Semenov mailto:vadim.seme...@datadoghq.com>>: You need to trigger an action on that rdd to checkpoint it. ``` scala>spark.sparkContext.setCheckpointDir(".") scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20))) df: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala> df.rdd.checkpoint() scala> df.rdd.isCheckpointed res2: Boolean = false scala> df.show() +--+---+ |_1| _2| +--+---+ | Scala| 35| |Python| 30| | R| 15| | Java| 20| +--+---+ scala> df.rdd.isCheckpointed res4: Boolean = false scala> df.rdd.count() res5: Long = 4 scala> df.rdd.isCheckpointed res6: Boolean = true ``` On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop mailto:bernard.je...@gmail.com>> wrote: Hi everyone, I just tried this simple program : import org.apache.spark.sql.SparkSession object CheckpointTest extends App { val spark = SparkSession .builder() .appName("Toto") .getOrCreate() spark.sparkContext.setCheckpointDir(".") val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20))) df.show() df.rdd.checkpoint() println(if (df.rdd.isCheckpointed) "checkpointed" else "not checkpointed") } But the result is still "not checkpointed". Do you have any idea why? (knowing that the checkpoint file is created) Best regards, Bernard JESOP
Re: underlying checkpoint
Thank you, one of my mistakes was to think that show() was an action. 2017-07-13 17:52 GMT+02:00 Vadim Semenov : > You need to trigger an action on that rdd to checkpoint it. > > ``` > scala>spark.sparkContext.setCheckpointDir(".") > > scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", > 30), ("R", 15), ("Java", 20))) > df: org.apache.spark.sql.DataFrame = [_1: string, _2: int] > > scala> df.rdd.checkpoint() > > scala> df.rdd.isCheckpointed > res2: Boolean = false > > scala> df.show() > +--+---+ > |_1| _2| > +--+---+ > | Scala| 35| > |Python| 30| > | R| 15| > | Java| 20| > +--+---+ > > > scala> df.rdd.isCheckpointed > res4: Boolean = false > > scala> df.rdd.count() > res5: Long = 4 > > scala> df.rdd.isCheckpointed > res6: Boolean = true > ``` > > On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop > wrote: > >> Hi everyone, I just tried this simple program : >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * import >> org.apache.spark.sql.SparkSession >> object CheckpointTest extends App >> { >>val spark = >> SparkSession >> >> .builder() >> >> .appName("Toto") >> >> .getOrCreate() >> >> spark.sparkContext.setCheckpointDir(".") >>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", >> 15), ("Java", >> 20))) >> >> df.show() >> >> df.rdd.checkpoint() >>println(if (df.rdd.isCheckpointed) "checkpointed" else "not >> checkpointed") >> }* >> But the result is still *"not checkpointed"*. >> Do you have any idea why? (knowing that the checkpoint file is created) >> >> Best regards, >> Bernard JESOP >> > >
Re: underlying checkpoint
You need to trigger an action on that rdd to checkpoint it. ``` scala>spark.sparkContext.setCheckpointDir(".") scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20))) df: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala> df.rdd.checkpoint() scala> df.rdd.isCheckpointed res2: Boolean = false scala> df.show() +--+---+ |_1| _2| +--+---+ | Scala| 35| |Python| 30| | R| 15| | Java| 20| +--+---+ scala> df.rdd.isCheckpointed res4: Boolean = false scala> df.rdd.count() res5: Long = 4 scala> df.rdd.isCheckpointed res6: Boolean = true ``` On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop wrote: > Hi everyone, I just tried this simple program : > > > > > > > > > > > > > > > > > > > * import > org.apache.spark.sql.SparkSession > object CheckpointTest extends App > { >val spark = > SparkSession > > .builder() > > .appName("Toto") > > .getOrCreate() > > spark.sparkContext.setCheckpointDir(".") >val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", > 15), ("Java", > 20))) > > df.show() > > df.rdd.checkpoint() >println(if (df.rdd.isCheckpointed) "checkpointed" else "not > checkpointed") > }* > But the result is still *"not checkpointed"*. > Do you have any idea why? (knowing that the checkpoint file is created) > > Best regards, > Bernard JESOP >
underlying checkpoint
Hi everyone, I just tried this simple program : * import org.apache.spark.sql.SparkSession object CheckpointTest extends App { val spark = SparkSession .builder() .appName("Toto") .getOrCreate() spark.sparkContext.setCheckpointDir(".") val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20))) df.show() df.rdd.checkpoint() println(if (df.rdd.isCheckpointed) "checkpointed" else "not checkpointed") }* But the result is still *"not checkpointed"*. Do you have any idea why? (knowing that the checkpoint file is created) Best regards, Bernard JESOP
Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming
Using a long period betweem checkpoints may cause a long linage of the graphs computations to be created, since Spark uses checkpointing to cut it, which can also cause a delay in the streaming job. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28820.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming
You can't. Spark doesn't let you fiddle with the data being checkpoint, as it's an internal implementation detail. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28815.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming
Hi, I have checkpoints enabled in Spark streaming and I use updateStateByKey and reduceByKeyAndWindow with inverse functions. How do I reduce the amount of data that I am writing to the checkpoint or clear out the data that I dont care? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
In either case, end to end exactly once guarantee can only be ensured only if the output sink is updated transactionally. The engine has to re execute data on failure. Exactly once guarantee means that the external storage is updated as if each data record was computed exactly once. That's why you need to update them transactionally to handle possible recomputations. This is true for both spark streaming and structured streaming. Hope this helps. On Jun 6, 2017 5:56 AM, "ALunar Beach" wrote: > Thanks TD. > In pre-structured streaming, exactly once guarantee on input is not > guaranteed. is it? > > On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das > wrote: > >> This is the expected behavior. There are some confusing corner cases. >> If you are starting to play with Spark Streaming, i highly recommend >> learning Structured Streaming >> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> >> instead. >> >> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan >> wrote: >> >>> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >>> It uses a 30 sec batch duration and normally the job is successful in >>> 15-20 sec. >>> >>> If the spark application fails after the successful completion >>> (149668428ms in the log below) and restarts, it's duplicating the last >>> batch again. >>> >>> Is this the expected behavior? I was expecting this to start a new batch >>> window. >>> >>> >>> Here are some logs: >>> >>> Last successful run: >>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time >>> 149668428 ms (execution: 0.029 s) >>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list >>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 >>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time >>> 149668428 ms to writer queue >>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time >>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide >>> aProjects/Spark2Example/ckpt/checkpoint-149668428' >>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time >>> 149668428 ms saved to file >>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', >>> took 4032 bytes and 9 ms* >>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time >>> 149668428 ms >>> >>> After the restart, >>> >>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct >>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time >>> 149668428 ms [(my_test,0,2000,2000)] >>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data >>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 >>> batches): 149668428 ms, 149668431 ms, 149668434 ms, >>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >>> 149668449 ms, 149668452 ms, 149668455 ms* >>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 >>> batches): * >>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 >>> batches): *149668428 ms, 149668431 ms, 149668434 ms, >>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >>> 149668449 ms, 149668452 ms, 149668455 ms >>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms >>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job >>> 149668428 ms.0 from job set of time 149668428 ms >>> >>> >>> >>> -- >>> View this message in context: Fwd: Spark Streaming Checkpoint and >>> Exactly Once Guarantee on Kafka Direct Stream >>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> >>> Sent from the Apache Spark User List mailing list archive >>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >>> >> >> >
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
Thanks TD. In pre-structured streaming, exactly once guarantee on input is not guaranteed. is it? On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das wrote: > This is the expected behavior. There are some confusing corner cases. > If you are starting to play with Spark Streaming, i highly recommend > learning Structured Streaming > <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> > instead. > > On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan > wrote: > >> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >> It uses a 30 sec batch duration and normally the job is successful in >> 15-20 sec. >> >> If the spark application fails after the successful completion >> (149668428ms in the log below) and restarts, it's duplicating the last >> batch again. >> >> Is this the expected behavior? I was expecting this to start a new batch >> window. >> >> >> Here are some logs: >> >> Last successful run: >> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time >> 149668428 ms (execution: 0.029 s) >> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list >> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 >> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time >> 149668428 ms to writer queue >> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time >> 149668428 ms to file 'file:/Users/anbucheeralan/Ide >> aProjects/Spark2Example/ckpt/checkpoint-149668428' >> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time >> 149668428 ms saved to file >> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', >> took 4032 bytes and 9 ms* >> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time >> 149668428 ms >> >> After the restart, >> >> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct >> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time >> 149668428 ms [(my_test,0,2000,2000)] >> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data >> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 >> batches): 149668428 ms, 149668431 ms, 149668434 ms, >> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >> 149668449 ms, 149668452 ms, 149668455 ms* >> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 >> batches): * >> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): >> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, >> 149668440 ms, 1496684430000 ms, 149668446 ms, 149668449 ms, >> 149668452 ms, 149668455 ms >> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms >> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job >> 149668428 ms.0 from job set of time 149668428 ms >> >> >> >> -- >> View this message in context: Fwd: Spark Streaming Checkpoint and >> Exactly Once Guarantee on Kafka Direct Stream >> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> >> Sent from the Apache Spark User List mailing list archive >> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >> > >
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
This is the expected behavior. There are some confusing corner cases. If you are starting to play with Spark Streaming, i highly recommend learning Structured Streaming <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> instead. On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan wrote: > I am using Spark Streaming Checkpoint and Kafka Direct Stream. > It uses a 30 sec batch duration and normally the job is successful in > 15-20 sec. > > If the spark application fails after the successful completion > (149668428ms in the log below) and restarts, it's duplicating the last > batch again. > > Is this the expected behavior? I was expecting this to start a new batch > window. > > > Here are some logs: > > Last successful run: > 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time > 149668428 ms (execution: 0.029 s) > 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list > 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 > 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time > 149668428 ms to writer queue > 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time > 149668428 ms to file 'file:/Users/anbucheeralan/Ide > aProjects/Spark2Example/ckpt/checkpoint-149668428' > 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time > 149668428 ms saved to file > 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', > took 4032 bytes and 9 ms* > 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time > 149668428 ms > > After the restart, > > 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct > KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time > 149668428 ms [(my_test,0,2000,2000)] > 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data > *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 > batches): 149668428 ms, 149668431 ms, 149668434 ms, > 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, > 149668449 ms, 149668452 ms, 149668455 ms* > *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 > batches): * > *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): > *149668428 > ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, > 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, > 149668455 ms > 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms > 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job > 149668428 ms.0 from job set of time 149668428 ms > > > > ------ > View this message in context: Fwd: Spark Streaming Checkpoint and Exactly > Once Guarantee on Kafka Direct Stream > <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again. Is this the expected behavior? I was expecting this to start a new batch window. Here are some logs: Last successful run: 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 149668428 ms (execution: 0.029 s) 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 149668428 ms to writer queue 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 149668428 ms to file 'file:/Users/anbucheeralan/ IdeaProjects/Spark2Example/ckpt/checkpoint-149668428' 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', took 4032 bytes and 9 ms* 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 149668428 ms After the restart, 17/06/05 13:42:31 INFO DirectKafkaInputDStream$ DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)] 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms* *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): * *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 149668428 ms.0 from job set of time 149668428 ms -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again. Is this the expected behavior? I was expecting this to start a new batch window. Here are some logs: Last successful run: 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 149668428 ms (execution: 0.029 s) 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 149668428 ms to writer queue 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 149668428 ms to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428' 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', took 4032 bytes and 9 ms* 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 149668428 ms After the restart, 17/06/05 13:42:31 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)] 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms* *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): * *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 149668428 ms.0 from job set of time 149668428 ms
Re: Temp checkpoint directory for EMR (S3 or HDFS)
You should actually be able to get to the underlying filesystem from your SparkContext: String originalFs = sparkContext.hadoopConfiguration().get("fs.defaultFS"); and then you could just use that: String checkpointPath = String.format("%s/%s/", originalFs, checkpointDirectory); sparkContext.setCheckpointDir(checkpointPath); Asher Krim Senior Software Engineer On Tue, May 30, 2017 at 12:37 PM, Everett Anderson wrote: > Still haven't found a --conf option. > > Regarding a temporary HDFS checkpoint directory, it looks like when using > --master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment > variable. Thus, one could do the following when creating a SparkSession: > > val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"), > "checkpoints").toString > sparkSession.sparkContext.setCheckpointDir(checkpointPath) > > The staging directory is in an HDFS path like > > /user/[user]/.sparkStaging/[YARN application ID] > > and is deleted at the end of the application > <https://github.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184> > . > > So this is one option, though certainly abusing the staging directory. > > A more general one might be to find where Dataset.persist(DISK_ONLY) > writes. > > > On Fri, May 26, 2017 at 9:08 AM, Everett Anderson > wrote: > >> Hi, >> >> I need to set a checkpoint directory as I'm starting to use GraphFrames. >> (Also, occasionally my regular DataFrame lineages get too long so it'd be >> nice to use checkpointing to squash the lineage.) >> >> I don't actually need this checkpointed data to live beyond the life of >> the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and >> reading and writing non-transient data to S3. >> >> Two questions: >> >> 1. Is there a Spark --conf option to set the checkpoint directory? >> Somehow I couldn't find it, but surely it exists. >> >> 2. What's a good checkpoint directory for this use case? I imagine it'd >> be on HDFS and presumably in a YARN application-specific temporary path >> that gets cleaned up afterwards. Does anyone have a recommendation? >> >> Thanks! >> >> - Everett >> >> >
Re: Temp checkpoint directory for EMR (S3 or HDFS)
Still haven't found a --conf option. Regarding a temporary HDFS checkpoint directory, it looks like when using --master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment variable. Thus, one could do the following when creating a SparkSession: val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"), "checkpoints").toString sparkSession.sparkContext.setCheckpointDir(checkpointPath) The staging directory is in an HDFS path like /user/[user]/.sparkStaging/[YARN application ID] and is deleted at the end of the application <https://github.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184> . So this is one option, though certainly abusing the staging directory. A more general one might be to find where Dataset.persist(DISK_ONLY) writes. On Fri, May 26, 2017 at 9:08 AM, Everett Anderson wrote: > Hi, > > I need to set a checkpoint directory as I'm starting to use GraphFrames. > (Also, occasionally my regular DataFrame lineages get too long so it'd be > nice to use checkpointing to squash the lineage.) > > I don't actually need this checkpointed data to live beyond the life of > the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and > reading and writing non-transient data to S3. > > Two questions: > > 1. Is there a Spark --conf option to set the checkpoint directory? Somehow > I couldn't find it, but surely it exists. > > 2. What's a good checkpoint directory for this use case? I imagine it'd be > on HDFS and presumably in a YARN application-specific temporary path that > gets cleaned up afterwards. Does anyone have a recommendation? > > Thanks! > > - Everett > >
Temp checkpoint directory for EMR (S3 or HDFS)
Hi, I need to set a checkpoint directory as I'm starting to use GraphFrames. (Also, occasionally my regular DataFrame lineages get too long so it'd be nice to use checkpointing to squash the lineage.) I don't actually need this checkpointed data to live beyond the life of the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and reading and writing non-transient data to S3. Two questions: 1. Is there a Spark --conf option to set the checkpoint directory? Somehow I couldn't find it, but surely it exists. 2. What's a good checkpoint directory for this use case? I imagine it'd be on HDFS and presumably in a YARN application-specific temporary path that gets cleaned up afterwards. Does anyone have a recommendation? Thanks! - Everett
Re: Spark checkpoint - nonstreaming
Just load it as from any other directory. > On 26. May 2017, at 17:26, Priya PM wrote: > > > -- Forwarded message -- > From: Priya PM > Date: Fri, May 26, 2017 at 8:54 PM > Subject: Re: Spark checkpoint - nonstreaming > To: Jörn Franke > > > Oh, how do i do it. I dont see it mentioned anywhere in the documentation. > > I have followed this link > https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md > to understand checkpoint work flow. > > But it doesnt seem to work the way it was mentioned below during the second > run to read from checkpointed RDD. > > > > Q: How to read checkpointed RDD ? > > runJob() will call finalRDD.partitions() to determine how many tasks there > will be. rdd.partitions() checks if the RDD has been checkpointed via > RDDCheckpointData which manages checkpointed RDD. If yes, return the > partitions of the RDD (Array[Partition]). When rdd.iterator() is called to > compute RDD's partition, computeOrReadCheckpoint(split: Partition) is also > called to check if the RDD is checkpointed. If yes, the parent RDD's > iterator(), a.k.a CheckpointRDD.iterator() will be called. CheckpointRDD > reads files on file system to produce RDD partition. That's why a parent > CheckpointRDD is added to checkpointed rdd trickly > > >> On Fri, May 26, 2017 at 8:48 PM, Jörn Franke wrote: >> Did you explicitly tell the application to read from the checkpoint >> directory ? >> This you have to do in non-streaming scenarios. >> >>> On 26. May 2017, at 16:52, Priya PM wrote: >>> >>> yes, i did set the checkpoint directory. I could see the checkpointed RDD >>> too. >>> >>> [root@ rdd-28]# pwd >>> /root/checkpointDir/9dd1acf0-bef8-4a4f-bf0e-f7624334abc5/rdd-28 >>> >>> I am using the MovieLens application to check spark checkpointing feature. >>> >>> code: MovieLensALS.scala >>> >>> def main(args: Array[String]) { >>> .. >>> .. >>> sc.setCheckpointDir("/root/checkpointDir") >>> } >>> >>> >>> >>>> On Fri, May 26, 2017 at 8:09 PM, Jörn Franke wrote: >>>> Do you have some source code? >>>> Did you set the checkpoint directory ? >>>> >>>> > On 26. May 2017, at 16:06, Priya wrote: >>>> > >>>> > Hi, >>>> > >>>> > With nonstreaming spark application, did checkpoint the RDD and I could >>>> > see >>>> > the RDD getting checkpointed. I have killed the application after >>>> > checkpointing the RDD and restarted the same application again >>>> > immediately, >>>> > but it doesn't seem to pick from checkpoint and it again checkpoints the >>>> > RDD. Could anyone please explain why am I seeing this behavior, why it is >>>> > not picking from the checkpoint and proceeding further from there on the >>>> > second run of the same application. Would really help me understand spark >>>> > checkpoint work flow if I can get some clarity on the behavior. Please >>>> > let >>>> > me know if I am missing something. >>>> > >>>> > [root@checkpointDir]# ls >>>> > 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5 >>>> > a4f14f43-e7c3-4f64-a980-8483b42bb11d >>>> > >>>> > [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la >>>> > total 0 >>>> > drwxr-xr-x. 3 root root 20 May 26 16:26 . >>>> > drwxr-xr-x. 4 root root 94 May 26 16:24 .. >>>> > drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28 >>>> > >>>> > [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/ >>>> > [root@priya-vm rdd-28]# ls >>>> > part-0 part-1 _partitioner >>>> > >>>> > Thanks >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > -- >>>> > View this message in context: >>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html >>>> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> > >>>> > - >>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> > >>> > >
Fwd: Spark checkpoint - nonstreaming
-- Forwarded message -- From: Priya PM Date: Fri, May 26, 2017 at 8:54 PM Subject: Re: Spark checkpoint - nonstreaming To: Jörn Franke Oh, how do i do it. I dont see it mentioned anywhere in the documentation. I have followed this link https://github.com/JerryLead/SparkInternals/blob/ master/markdown/english/6-CacheAndCheckpoint.md to understand checkpoint work flow. But it doesnt seem to work the way it was mentioned below during the second run to read from checkpointed RDD. *Q: How to read checkpointed RDD ?* runJob() will call finalRDD.partitions() to determine how many tasks there will be. rdd.partitions() checks if the RDD has been checkpointed via RDDCheckpointData which manages checkpointed RDD. If yes, return the partitions of the RDD (Array[Partition]). When rdd.iterator() is called to compute RDD's partition, computeOrReadCheckpoint(split: Partition) is also called to check if the RDD is checkpointed. If yes, the parent RDD's iterator(), a.k.a CheckpointRDD.iterator() will be called. CheckpointRDD reads files on file system to produce RDD partition. *That's why a parent * *CheckpointRDD** is added to checkpointed rdd trickly* On Fri, May 26, 2017 at 8:48 PM, Jörn Franke wrote: > Did you explicitly tell the application to read from the checkpoint > directory ? > This you have to do in non-streaming scenarios. > > On 26. May 2017, at 16:52, Priya PM wrote: > > yes, i did set the checkpoint directory. I could see the checkpointed RDD > too. > > [root@ rdd-28]# pwd > /root/checkpointDir/9dd1acf0-bef8-4a4f-bf0e-f7624334abc5/rdd-28 > > I am using the MovieLens application to check spark checkpointing feature. > > code: MovieLensALS.scala > > def main(args: Array[String]) { > .. > .. > sc.setCheckpointDir("/root/checkpointDir") > } > > > > On Fri, May 26, 2017 at 8:09 PM, Jörn Franke wrote: > >> Do you have some source code? >> Did you set the checkpoint directory ? >> >> > On 26. May 2017, at 16:06, Priya wrote: >> > >> > Hi, >> > >> > With nonstreaming spark application, did checkpoint the RDD and I could >> see >> > the RDD getting checkpointed. I have killed the application after >> > checkpointing the RDD and restarted the same application again >> immediately, >> > but it doesn't seem to pick from checkpoint and it again checkpoints the >> > RDD. Could anyone please explain why am I seeing this behavior, why it >> is >> > not picking from the checkpoint and proceeding further from there on the >> > second run of the same application. Would really help me understand >> spark >> > checkpoint work flow if I can get some clarity on the behavior. Please >> let >> > me know if I am missing something. >> > >> > [root@checkpointDir]# ls >> > 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5 a4f14f43-e7c3-4f64-a980-8483b4 >> 2bb11d >> > >> > [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la >> > total 0 >> > drwxr-xr-x. 3 root root 20 May 26 16:26 . >> > drwxr-xr-x. 4 root root 94 May 26 16:24 .. >> > drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28 >> > >> > [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/ >> > [root@priya-vm rdd-28]# ls >> > part-0 part-1 _partitioner >> > >> > Thanks >> > >> > >> > >> > >> > >> > -- >> > View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com >> . >> > >> > - >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> > >
Re: Spark checkpoint - nonstreaming
Do you have some source code? Did you set the checkpoint directory ? > On 26. May 2017, at 16:06, Priya wrote: > > Hi, > > With nonstreaming spark application, did checkpoint the RDD and I could see > the RDD getting checkpointed. I have killed the application after > checkpointing the RDD and restarted the same application again immediately, > but it doesn't seem to pick from checkpoint and it again checkpoints the > RDD. Could anyone please explain why am I seeing this behavior, why it is > not picking from the checkpoint and proceeding further from there on the > second run of the same application. Would really help me understand spark > checkpoint work flow if I can get some clarity on the behavior. Please let > me know if I am missing something. > > [root@checkpointDir]# ls > 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5 a4f14f43-e7c3-4f64-a980-8483b42bb11d > > [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la > total 0 > drwxr-xr-x. 3 root root 20 May 26 16:26 . > drwxr-xr-x. 4 root root 94 May 26 16:24 .. > drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28 > > [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/ > [root@priya-vm rdd-28]# ls > part-0 part-1 _partitioner > > Thanks > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark checkpoint - nonstreaming
In non streaming Spark checkpoints aren't for inter-application recovery, rather you can think of them as doing persist but to a HDFS rather than each nodes local memory / storage. On Fri, May 26, 2017 at 3:06 PM Priya wrote: > Hi, > > With nonstreaming spark application, did checkpoint the RDD and I could see > the RDD getting checkpointed. I have killed the application after > checkpointing the RDD and restarted the same application again immediately, > but it doesn't seem to pick from checkpoint and it again checkpoints the > RDD. Could anyone please explain why am I seeing this behavior, why it is > not picking from the checkpoint and proceeding further from there on the > second run of the same application. Would really help me understand spark > checkpoint work flow if I can get some clarity on the behavior. Please let > me know if I am missing something. > > [root@checkpointDir]# ls > 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5 a4f14f43-e7c3-4f64-a980-8483b42bb11d > > [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la > total 0 > drwxr-xr-x. 3 root root 20 May 26 16:26 . > drwxr-xr-x. 4 root root 94 May 26 16:24 .. > drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28 > > [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/ > [root@priya-vm rdd-28]# ls > part-0 part-1 _partitioner > > Thanks > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau
Spark checkpoint - nonstreaming
Hi, With nonstreaming spark application, did checkpoint the RDD and I could see the RDD getting checkpointed. I have killed the application after checkpointing the RDD and restarted the same application again immediately, but it doesn't seem to pick from checkpoint and it again checkpoints the RDD. Could anyone please explain why am I seeing this behavior, why it is not picking from the checkpoint and proceeding further from there on the second run of the same application. Would really help me understand spark checkpoint work flow if I can get some clarity on the behavior. Please let me know if I am missing something. [root@checkpointDir]# ls 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5 a4f14f43-e7c3-4f64-a980-8483b42bb11d [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la total 0 drwxr-xr-x. 3 root root 20 May 26 16:26 . drwxr-xr-x. 4 root root 94 May 26 16:24 .. drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28 [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/ [root@priya-vm rdd-28]# ls part-0 part-1 _partitioner Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming: NullPointerException when restoring Spark Streaming job from hdfs/s3 checkpoint
Im having some difficulty reliably restoring a streaming job from a checkpoint. When restoring a streaming job constructed from the following snippet, I receive NullPointerException's when `map` is called on the the restored RDD. lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _) private def createStreamingContext: StreamingContext = { val ssc = new StreamingContext(spark.sparkContext, batchInterval) ssc.checkpoint(checkpointDir) consumeStreamingContext(ssc) ssc } def consumeStreamingContext(ssc: StreamingContext) = { //... create dstreams val dstream = KinesisUtil.createStream( ... dstream.checkpoint(batchInterval) dstream .foreachRDD(process) } def process(events: RDD[Event]) = { if (!events.isEmpty()) { logger.info("Transforming events for processing") //rdd seems to support some operations? logger.info(s"RDD LENGTH: ${events.count}") //nullpointer exception on call to .map val df = events.map(e => { ... } } } . . . . . . . . . . . . . . . . . . . . . . . . . . . Richard Moorhead Software Engineer richard.moorh...@c2fo.com<mailto:richard.moorh...@gmail.com> C2FO: The World's Market for Working Capital® [http://c2fo.com/wp-content/uploads/sites/1/2016/03/LinkedIN.png] <https://www.linkedin.com/company/c2fo?trk=vsrp_companies_res_name&trkInfo=VSRPsearchId%3A125658601427902817660%2CVSRPtargetId%3A1555109%2CVSRPcmpt%3Aprimary> [http://c2fo.com/wp-content/uploads/sites/1/2016/03/YouTube.png] <https://www.youtube.com/c/C2FOMarket> [http://c2fo.com/wp-content/uploads/sites/1/2016/03/Twitter.png] <https://twitter.com/C2FO> [http://c2fo.com/wp-content/uploads/sites/1/2016/03/Googleplus.png] <https://plus.google.com/+C2foMarket/posts> [http://c2fo.com/wp-content/uploads/sites/1/2016/03/Facebook.png] <https://www.facebook.com/C2FOMarketplace> [http://c2fo.com/wp-content/uploads/sites/1/2016/03/Forbes-Fintech-50.png] <https://c2fo.com/media-coverage/c2fo-included-forbes-fintech-50> The information contained in this message and any attachment may be privileged, confidential, and protected from disclosure. If you are not the intended recipient, or an employee, or agent responsible for delivering this message to the intended recipient, you are hereby notified that any dissemination, distribution, or copying of this communication is strictly prohibited. If you have received this communication in error, please notify us immediately by replying to the message and deleting from your computer.
checkpoint on spark standalone
Hi, I am processing multiple 2 GB each csv files with my spark application. Which also does union and aggregation across all the input files. Currently stuck with given below error: java.lang.StackOverflowError at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.dataType(interfaces.scala:81) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:149) at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$7.apply(basicOperators.scala:228) at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$7.apply(basicOperators.scala:228) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:228) at org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38) at org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38) at scala.Option.getOrElse(Option.scala:120) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:228) at org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38) at org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.planning.PhysicalOperation$.unapply(patterns.scala:38) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:44) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:334) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:327) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) I understand that it could be due to RDD linage and tried to resolve it via checkpoint, but no luck. Any help? Sincerely, -Vivek NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the
Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration
Thank you your reply, I will open pull request for this doc issue. The logic is clear. пт, 14 апр. 2017, 23:34 Michael Armbrust : > 1) could we update documentation for Structured Streaming and describe >> that checkpointing could be specified by >> spark.sql.streaming.checkpointLocation on SparkSession level and thus >> automatically checkpoint dirs will be created per foreach query? >> >> > Sure, please open a pull request. > > >> 2) Do we really need to specify the checkpoint dir per query? what the >> reason for this? finally we will be forced to write some checkpointDir name >> generator, for example associate it with some particular named query and so >> on? >> > > Every query needs to have a unique checkpoint as this is how we track what > has been processed. If we don't have this, we can't restart the query > where it left off. In you example, I would suggest including the metric > name in the checkpoint location path. > -- *Yours faithfully, * *Kate Eri.*
Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration
> > 1) could we update documentation for Structured Streaming and describe > that checkpointing could be specified by > spark.sql.streaming.checkpointLocation > on SparkSession level and thus automatically checkpoint dirs will be > created per foreach query? > > Sure, please open a pull request. > 2) Do we really need to specify the checkpoint dir per query? what the > reason for this? finally we will be forced to write some checkpointDir name > generator, for example associate it with some particular named query and so > on? > Every query needs to have a unique checkpoint as this is how we track what has been processed. If we don't have this, we can't restart the query where it left off. In you example, I would suggest including the metric name in the checkpoint location path.
Re: checkpoint
Sorry - can't help with PySpark, but here is some Java code which you may be able to transform to Python? http://jgp.net/2017/02/02/what-are-spark-checkpoints-on-dataframes/ jg > On Apr 14, 2017, at 07:18, issues solution wrote: > > Hi > somone can give me an complete example to work with chekpoint under Pyspark > 1.6 ? > > thx > regards
checkpoint
Hi somone can give me an complete example to work with chekpoint under Pyspark 1.6 ? thx regards
SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration
Hello, guys. I have initiated the ticket https://issues.apache.org/jira/browse/SPARK-20325 , My case was: I launch two streams from one source stream *streamToProcess *like this streamToProcess .groupBy(metric) .agg(count(metric)) .writeStream .outputMode("complete") .option("checkpointLocation", checkpointDir) .foreach(kafkaWriter) .start() After that I’ve got an exception: Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active. It is caused by that *StreamingQueryManager.scala* get the checkpoint dir from stream’s configuration, and because my streams have equal checkpointDirs, the second stream tries to recover instead of creating of new one.For more details watch the ticket: SPARK-20325 1) could we update documentation for Structured Streaming and describe that checkpointing could be specified by spark.sql.streaming.checkpointLocation on SparkSession level and thus automatically checkpoint dirs will be created per foreach query? 2) Do we really need to specify the checkpoint dir per query? what the reason for this? finally we will be forced to write some checkpointDir name generator, for example associate it with some particular named query and so on? -- *Yours faithfully, * *Kate Eri.*
checkpoint how to use correctly checkpoint with udf
Hi , somone can explain me how i can use inPYSPAK not in scala chekpoint , Because i have lot of udf to apply on large data frame and i dont understand how i can use checkpoint to break lineag to prevent from java.lang.stackoverflow regrads
Re: checkpoint
Looks like your udf expects numeric data but you are sending string type. Suggest to cast to numeric. On Thu, 13 Apr 2017 at 7:03 pm, issues solution wrote: > Hi > I am newer in spark and i want ask you what wrang with checkpoint On > pyspark 1.6.0 > > i dont unertsand what happen after i try to use it under datframe : >dfTotaleNormalize24 = dfTotaleNormalize23.select([i if i not in > listrapcot else udf_Grappra(F.col(i)).alias(i) for i in > dfTotaleNormalize23.columns ]) > > dfTotaleNormalize24.cache() <- cache on memory > dfTotaleNormalize24.count <-matrialize dataframe( rdd too ??) > dfTotaleNormalize24.rdd.checkpoint() <- (cut DAG and save rdd not yet) > dfTotaleNormalize24.rdd.count() <--- matrialize in file > > but why i get the following error : > > java.lang.UnsupportedOperationException: Cannot evaluate expression: > PythonUDF#Grappra(input[410, StringType]) > > > thank's to explain all details and steps to save and check point > > Mydatframe it huge on with more than 5 Million rows and 1000 columns > > and udf befor are applied on more than 150 columns it replace ' ' by 0.0 > that all. > > regards > -- Best Regards, Ayan Guha
checkpoint
Hi I am newer in spark and i want ask you what wrang with checkpoint On pyspark 1.6.0 i dont unertsand what happen after i try to use it under datframe : dfTotaleNormalize24 = dfTotaleNormalize23.select([i if i not in listrapcot else udf_Grappra(F.col(i)).alias(i) for i in dfTotaleNormalize23.columns ]) dfTotaleNormalize24.cache() <- cache on memory dfTotaleNormalize24.count <-matrialize dataframe( rdd too ??) dfTotaleNormalize24.rdd.checkpoint() <- (cut DAG and save rdd not yet) dfTotaleNormalize24.rdd.count() <--- matrialize in file but why i get the following error : java.lang.UnsupportedOperationException: Cannot evaluate expression: PythonUDF#Grappra(input[410, StringType]) thank's to explain all details and steps to save and check point Mydatframe it huge on with more than 5 Million rows and 1000 columns and udf befor are applied on more than 150 columns it replace ' ' by 0.0 that all. regards
[Spark Streaming] Checkpoint backup (.bk) file purpose
Hello, Actually I'm studying metadata checkpoint implementation in Spark Streaming and I was wondering the purpose of so called "backup files": CheckpointWriter snippet: > // We will do checkpoint when generating a batch and completing a batch. > When the processing > // time of a batch is greater than the batch interval, checkpointing for > completing an old > // batch may run after checkpointing of a new batch. If this happens, > checkpoint of an old > // batch actually has the latest information, so we want to recovery from > it. Therefore, we > // also use the latest checkpoint time as the file name, so that we can > recover from the > // latest checkpoint file. > // > // Note: there is only one thread writing the checkpoint files, so we > don't need to worry > // about thread-safety. > val checkpointFile = Checkpoint.checkpointFile(checkpointDir, > latestCheckpointTime) > val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, > latestCheckpointTime) > > // ... some lines further > // If the checkpoint file exists, back it up > // If the backup exists as well, just delete it, otherwise rename will fail > if (fs.exists(checkpointFile)) { > fs.delete(backupFile, true) // just in case it exists > if (!fs.rename(checkpointFile, backupFile)) { > logWarning(s"Could not rename $checkpointFile to $backupFile") > } > } > What is the role of this *backupFile* ? I understand that they are generated if checkpoint file for given timestamp already exists. But how it could be produced ? Is it a protection against checkpointing of different Spark applications to the same directory ? Or it's adapted to case described above (old batch terminated after new batch start) ? Best regards, Bartosz.
Re: Spark standalone cluster on EC2 error .. Checkpoint..
Thanks TD and Marco for the feedback. The directory referenced by SPARK_LOCAL_DIRS did not exist. After creating that directory, it worked. This was the first time I was trying to run spark on standalone cluster, so I missed it. Thanks On Fri, Feb 17, 2017 at 12:35 PM, Tathagata Das wrote: > Seems like an issue with the HDFS you are using for checkpointing. Its not > able to write data properly. > > On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande > wrote: > >> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >> File >> /checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3 >> could only be replicated to 0 nodes instead of minReplication (=1). There >> are 0 datanode(s) running and no node(s) are excluded in this operation. >> >> This is the error I get when I run my spark streaming app on 2 node EC2 >> cluster, with 1 master and 1 worker. >> >> Works fine in local mode. Please help. >> >> Thanks >> > >