Yes, I agree. But apart from maintaining this state internally (in memory or in memory+disk as in case of RocksDB), every trigger it saves some information about this state in a checkpoint location. I'm afraid we can't do much about this checkpointing operation. I'll continue looking for information on how I can decrease the number of LIST requests (ListBucket operations) made in this process.
Thank you for your input so far! Andrzej śr., 10 sty 2024 o 16:33 Mich Talebzadeh <mich.talebza...@gmail.com> napisał(a): > Hi, > > You may have a point on scenario 2. > > Caching Streaming DataFrames: In Spark Streaming, each batch of data is > processed incrementally, and it may not fit the typical caching we > discussed. Instead, Spark Streaming has its mechanisms to manage and > optimize the processing of streaming data. Case in point for caching > partial results, one often relies on maintaining state by using stateful > operations (see below) on Structured Streaming DataFrames. In such > scenarios, Spark maintains state internally based on the operations > performed. For example, if you are doing a groupBy followed by an > aggregation, Spark Streaming will manage the state of the keys and update > them incrementally. > > Just to clarify, in the context of Spark Structured Streaming stateful > operation refers to an operation that maintains and updates some form of > state across batches of streaming data. Unlike stateless operations, which > process each batch independently, stateful operations retain information > from previous batches and use it to produce results for the current batch. > > So, bottom line, while one may not explicitly cache a streaming data > frame, Spark internally optimizes the processing by maintaining the > necessary state. > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 10 Jan 2024 at 14:20, Andrzej Zera <andrzejz...@gmail.com> wrote: > >> Hey, >> >> Yes, that's how I understood it (scenario 1). However, I'm not sure if >> scenario 2 is possible. I think cache on streaming DataFrame is supported >> only in forEachBatch (in which it's actually no longer a streaming DF). >> >> śr., 10 sty 2024 o 15:01 Mich Talebzadeh <mich.talebza...@gmail.com> >> napisał(a): >> >>> Hi, >>> >>> With regard to your point >>> >>> - Caching: Can you please explain what you mean by caching? I know that >>> when you have batch and streaming sources in a streaming query, then you >>> can try to cache batch ones to save on reads. But I'm not sure if it's what >>> you mean, and I don't know how to apply what you suggest to streaming data. >>> >>> Let us visit this >>> >>> Caching purpose in Structured Streaming is to store frequently accessed >>> data in memory or disk for faster retrieval, reducing repeated reads from >>> sources. >>> >>> - Types: >>> >>> - Memory Caching: Stores data in memory for extremely fast access. >>> - Disk Caching: Stores data on disk for larger datasets or >>> persistence across triggers >>> >>> >>> - Scenarios: >>> >>> Joining Streaming Data with Static Data: Cache static datasets >>> (e.g., reference tables) to avoid repeated reads for each micro-batch. >>> >>> - >>> - Reusing Intermediate Results: Cache intermediate dataframes that >>> are expensive to compute and used multiple times within the query. >>> - Window Operations: Cache data within a window to avoid re-reading >>> for subsequent aggregations or calculations within that window. >>> >>> - Benefits: >>> >>> - Performance: Faster query execution by reducing I/O operations and >>> computation overhead. >>> - Cost Optimization: Reduced reads from external sources can lower >>> costs, especially for cloud-based sources. >>> - Scalability: Helps handle higher data volumes and throughput by >>> minimizing expensive re-computations. >>> >>> >>> Example codec >>> >>> scenario 1 >>> >>> static_data = spark.read.load("path/to/static/data") >>> static_data.cache() streaming_data = spark.readStream.format("...").load() >>> joined_data = streaming_data.join(static_data, ...) # Static data is >>> cached for efficient joins >>> >>> scenario 2 >>> >>> intermediate_df = streaming_data.groupBy(...).count() >>> intermediate_df.cache() >>> # Use cached intermediate_df for further transformations or actions >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com> >>> wrote: >>> >>>> Thank you very much for your suggestions. Yes, my main concern is >>>> checkpointing costs. >>>> >>>> I went through your suggestions and here're my comments: >>>> >>>> - Caching: Can you please explain what you mean by caching? I know that >>>> when you have batch and streaming sources in a streaming query, then you >>>> can try to cache batch ones to save on reads. But I'm not sure if it's what >>>> you mean, and I don't know how to apply what you suggest to streaming data. >>>> >>>> - Optimize Checkpointing Frequency: I'm already using changelog >>>> checkpointing with RocksDB and increased trigger interval to a maximum >>>> acceptable value. >>>> >>>> - Minimize LIST Request: That's where I can get most savings. My LIST >>>> requests account for ~70% of checkpointing costs. From what I see, LIST >>>> requests are ~2.5x the number of PUT requests. Unfortunately, when I >>>> changed to checkpoting location DBFS, it didn't help with minimizing LIST >>>> requests. They are roughly at the same level. From what I see, S3 Optimized >>>> Committer is EMR-specific so I can't use it in Databricks. The fact that I >>>> don't see a difference between S3 and DBFS checkpoint location suggests >>>> that both must implement the same or similar committer. >>>> >>>> - Optimizing RocksDB: I still need to do this but I don't suspect it >>>> will help much. From what I understand, these settings shouldn't have a >>>> significant impact on the number of requests to S3. >>>> >>>> Any other ideas how to limit the number of LIST requests are appreciated >>>> >>>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com> >>>> napisał(a): >>>> >>>>> OK I assume that your main concern is checkpointing costs. >>>>> >>>>> - Caching: If your queries read the same data multiple times, caching >>>>> the data might reduce the amount of data that needs to be checkpointed. >>>>> >>>>> >>>>> - Optimize Checkpointing Frequency i.e >>>>> >>>>> - Consider Changelog Checkpointing with RocksDB. This can >>>>> potentially reduce checkpoint size and duration by only storing state >>>>> changes, rather than the entire state. >>>>> - Adjust Trigger Interval (if possible): While not ideal for your >>>>> near-real time requirement, even a slight increase in the trigger >>>>> interval >>>>> (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs. >>>>> >>>>> - Minimize LIST Requests: >>>>> >>>>> - Enable S3 Optimized Committer: or as you stated consider DBFS >>>>> >>>>> You can also optimise RocksDB. Set your state backend to RocksDB, if >>>>> not already. Here are what I use >>>>> >>>>> # Add RocksDB configurations here >>>>> spark.conf.set("spark.sql.streaming.stateStore.providerClass", >>>>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") >>>>> >>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true") >>>>> >>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", >>>>> "64") # Example configuration >>>>> >>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style", >>>>> "level") >>>>> >>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase", >>>>> "67108864") >>>>> >>>>> These configurations provide a starting point for tuning RocksDB. >>>>> Depending on your specific use case and requirements, of course, your >>>>> mileage varies. >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Dad | Technologist | Solutions Architect | Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Usually one or two topics per query. Each query has its own >>>>>> checkpoint directory. Each topic has a few partitions. >>>>>> >>>>>> Performance-wise I don't experience any bottlenecks in terms of >>>>>> checkpointing. It's all about the number of requests (including a high >>>>>> number of LIST requests) and the associated cost. >>>>>> >>>>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>> napisał(a): >>>>>> >>>>>>> How many topics and checkpoint directories are you dealing with? >>>>>>> >>>>>>> Does each topic has its own checkpoint on S3? >>>>>>> >>>>>>> All these checkpoints are sequential writes so even SSD would not >>>>>>> really help >>>>>>> >>>>>>> HTH >>>>>>> >>>>>>> Mich Talebzadeh, >>>>>>> Dad | Technologist | Solutions Architect | Engineer >>>>>>> London >>>>>>> United Kingdom >>>>>>> >>>>>>> >>>>>>> view my Linkedin profile >>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>>> >>>>>>> >>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that >>>>>>>> require near-real time accuracy with trigger intervals in the level of >>>>>>>> 5-10 >>>>>>>> seconds. I usually run 3-6 streaming queries as part of the job and >>>>>>>> each >>>>>>>> query includes at least one stateful operation (and usually two or >>>>>>>> more). >>>>>>>> My checkpoint location is S3 bucket and I use RocksDB as a state store. >>>>>>>> Unfortunately, checkpointing costs are quite high. It's the main cost >>>>>>>> item >>>>>>>> of the system and it's roughly 4-5 times the cost of compute. >>>>>>>> >>>>>>>> To save on compute costs, the following things are usually >>>>>>>> recommended: >>>>>>>> >>>>>>>> - increase trigger interval (as mentioned, I don't have much >>>>>>>> space here) >>>>>>>> - decrease the number of shuffle partitions (I have 2x the >>>>>>>> number of workers) >>>>>>>> >>>>>>>> I'm looking for some other recommendations that I can use to save >>>>>>>> on checkpointing costs. I saw that most requests are LIST requests. >>>>>>>> Can we >>>>>>>> cut them down somehow? I'm using Databricks. If I replace S3 bucket >>>>>>>> with >>>>>>>> DBFS, will it help in any way? >>>>>>>> >>>>>>>> Thank you! >>>>>>>> Andrzej >>>>>>>> >>>>>>>>