Yes, I agree. But apart from maintaining this state internally (in memory
or in memory+disk as in case of RocksDB), every trigger it saves some
information about this state in a checkpoint location. I'm afraid we can't
do much about this checkpointing operation. I'll continue looking for
information on how I can decrease the number of LIST requests (ListBucket
operations) made in this process.

Thank you for your input so far!
Andrzej

śr., 10 sty 2024 o 16:33 Mich Talebzadeh <mich.talebza...@gmail.com>
napisał(a):

> Hi,
>
> You may have a point on scenario 2.
>
> Caching Streaming DataFrames: In Spark Streaming, each batch of data is
> processed incrementally, and it may not fit the typical caching we
> discussed. Instead, Spark Streaming has its mechanisms to manage and
> optimize the processing of streaming data. Case in point for caching
> partial results, one often relies on maintaining state by using stateful
> operations (see below) on Structured Streaming DataFrames. In such
> scenarios, Spark maintains state internally based on the operations
> performed. For example, if you are doing a groupBy followed by an
> aggregation, Spark Streaming will manage the state of the keys and update
> them incrementally.
>
> Just to clarify, in the context of Spark Structured Streaming stateful
> operation refers to an operation that maintains and updates some form of
> state across batches of streaming data. Unlike stateless operations, which
> process each batch independently, stateful operations retain information
> from previous batches and use it to produce results for the current batch.
>
> So, bottom line, while one may not explicitly cache a streaming data
> frame, Spark internally optimizes the processing by maintaining the
> necessary state.
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 14:20, Andrzej Zera <andrzejz...@gmail.com> wrote:
>
>> Hey,
>>
>> Yes, that's how I understood it (scenario 1). However, I'm not sure if
>> scenario 2 is possible. I think cache on streaming DataFrame is supported
>> only in forEachBatch (in which it's actually no longer a streaming DF).
>>
>> śr., 10 sty 2024 o 15:01 Mich Talebzadeh <mich.talebza...@gmail.com>
>> napisał(a):
>>
>>> Hi,
>>>
>>>  With regard to your point
>>>
>>> - Caching: Can you please explain what you mean by caching? I know that
>>> when you have batch and streaming sources in a streaming query, then you
>>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>>> you mean, and I don't know how to apply what you suggest to streaming data.
>>>
>>> Let us visit this
>>>
>>> Caching purpose in Structured Streaming is to store frequently accessed
>>> data in memory or disk for faster retrieval, reducing repeated reads from
>>> sources.
>>>
>>> - Types:
>>>
>>>    - Memory Caching: Stores data in memory for extremely fast access.
>>>    - Disk Caching: Stores data on disk for larger datasets or
>>>    persistence across triggers
>>>
>>>
>>> - Scenarios:
>>>
>>> Joining Streaming Data with Static Data: Cache static datasets
>>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>>
>>>    -
>>>    - Reusing Intermediate Results: Cache intermediate dataframes that
>>>    are expensive to compute and used multiple times within the query.
>>>    - Window Operations: Cache data within a window to avoid re-reading
>>>    for subsequent aggregations or calculations within that window.
>>>
>>> - Benefits:
>>>
>>>    - Performance: Faster query execution by reducing I/O operations and
>>>    computation overhead.
>>>    - Cost Optimization: Reduced reads from external sources can lower
>>>    costs, especially for cloud-based sources.
>>>    - Scalability: Helps handle higher data volumes and throughput by
>>>    minimizing expensive re-computations.
>>>
>>>
>>> Example codec
>>>
>>> scenario 1
>>>
>>> static_data = spark.read.load("path/to/static/data")
>>> static_data.cache() streaming_data = spark.readStream.format("...").load()
>>> joined_data = streaming_data.join(static_data, ...) # Static data is
>>> cached for efficient joins
>>>
>>> scenario 2
>>>
>>> intermediate_df = streaming_data.groupBy(...).count()
>>> intermediate_df.cache()
>>> # Use cached intermediate_df for further transformations or actions
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com>
>>> wrote:
>>>
>>>> Thank you very much for your suggestions. Yes, my main concern is
>>>> checkpointing costs.
>>>>
>>>> I went through your suggestions and here're my comments:
>>>>
>>>> - Caching: Can you please explain what you mean by caching? I know that
>>>> when you have batch and streaming sources in a streaming query, then you
>>>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>>>> you mean, and I don't know how to apply what you suggest to streaming data.
>>>>
>>>> - Optimize Checkpointing Frequency: I'm already using changelog
>>>> checkpointing with RocksDB and increased trigger interval to a maximum
>>>> acceptable value.
>>>>
>>>> - Minimize LIST Request: That's where I can get most savings. My LIST
>>>> requests account for ~70% of checkpointing costs. From what I see, LIST
>>>> requests are ~2.5x the number of PUT requests. Unfortunately, when I
>>>> changed to checkpoting location DBFS, it didn't help with minimizing LIST
>>>> requests. They are roughly at the same level. From what I see, S3 Optimized
>>>> Committer is EMR-specific so I can't use it in Databricks. The fact that I
>>>> don't see a difference between S3 and DBFS checkpoint location suggests
>>>> that both must implement the same or similar committer.
>>>>
>>>> - Optimizing RocksDB: I still need to do this but I don't suspect it
>>>> will help much. From what I understand, these settings shouldn't have a
>>>> significant impact on the number of requests to S3.
>>>>
>>>> Any other ideas how to limit the number of LIST requests are appreciated
>>>>
>>>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> OK I assume that your main concern is checkpointing costs.
>>>>>
>>>>> - Caching: If your queries read the same data multiple times, caching
>>>>> the data might reduce the amount of data that needs to be checkpointed.
>>>>>
>>>>>
>>>>> - Optimize Checkpointing Frequency i.e
>>>>>
>>>>>    - Consider Changelog Checkpointing with RocksDB.  This can
>>>>>    potentially reduce checkpoint size and duration by only storing state
>>>>>    changes, rather than the entire state.
>>>>>    - Adjust Trigger Interval (if possible): While not ideal for your
>>>>>    near-real time requirement, even a slight increase in the trigger 
>>>>> interval
>>>>>    (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>>>>>
>>>>> - Minimize LIST Requests:
>>>>>
>>>>>    - Enable S3 Optimized Committer: or as you stated consider DBFS
>>>>>
>>>>> You can also optimise RocksDB. Set your state backend to RocksDB, if
>>>>> not already. Here are what I use
>>>>>
>>>>>               # Add RocksDB configurations here
>>>>>         spark.conf.set("spark.sql.streaming.stateStore.providerClass",
>>>>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
>>>>>
>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true")
>>>>>
>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
>>>>> "64")  # Example configuration
>>>>>
>>>>>  spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
>>>>> "level")
>>>>>
>>>>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
>>>>> "67108864")
>>>>>
>>>>> These configurations provide a starting point for tuning RocksDB.
>>>>> Depending on your specific use case and requirements, of course, your
>>>>> mileage varies.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Usually one or two topics per query. Each query has its own
>>>>>> checkpoint directory. Each topic has a few partitions.
>>>>>>
>>>>>> Performance-wise I don't experience any bottlenecks in terms of
>>>>>> checkpointing. It's all about the number of requests (including a high
>>>>>> number of LIST requests) and the associated cost.
>>>>>>
>>>>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>>> napisał(a):
>>>>>>
>>>>>>> How many topics and checkpoint directories are you dealing with?
>>>>>>>
>>>>>>> Does each topic has its own checkpoint  on S3?
>>>>>>>
>>>>>>> All these checkpoints are sequential writes so even SSD would not
>>>>>>> really help
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>>
>>>>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
>>>>>>>> require near-real time accuracy with trigger intervals in the level of 
>>>>>>>> 5-10
>>>>>>>> seconds. I usually run 3-6 streaming queries as part of the job and 
>>>>>>>> each
>>>>>>>> query includes at least one stateful operation (and usually two or 
>>>>>>>> more).
>>>>>>>> My checkpoint location is S3 bucket and I use RocksDB as a state store.
>>>>>>>> Unfortunately, checkpointing costs are quite high. It's the main cost 
>>>>>>>> item
>>>>>>>> of the system and it's roughly 4-5 times the cost of compute.
>>>>>>>>
>>>>>>>> To save on compute costs, the following things are usually
>>>>>>>> recommended:
>>>>>>>>
>>>>>>>>    - increase trigger interval (as mentioned, I don't have much
>>>>>>>>    space here)
>>>>>>>>    - decrease the number of shuffle partitions (I have 2x the
>>>>>>>>    number of workers)
>>>>>>>>
>>>>>>>> I'm looking for some other recommendations that I can use to save
>>>>>>>> on checkpointing costs. I saw that most requests are LIST requests. 
>>>>>>>> Can we
>>>>>>>> cut them down somehow? I'm using Databricks. If I replace S3 bucket 
>>>>>>>> with
>>>>>>>> DBFS, will it help in any way?
>>>>>>>>
>>>>>>>> Thank you!
>>>>>>>> Andrzej
>>>>>>>>
>>>>>>>>

Reply via email to