Hi,

 With regard to your point

- Caching: Can you please explain what you mean by caching? I know that
when you have batch and streaming sources in a streaming query, then you
can try to cache batch ones to save on reads. But I'm not sure if it's what
you mean, and I don't know how to apply what you suggest to streaming data.

Let us visit this

Caching purpose in Structured Streaming is to store frequently accessed
data in memory or disk for faster retrieval, reducing repeated reads from
sources.

- Types:

   - Memory Caching: Stores data in memory for extremely fast access.
   - Disk Caching: Stores data on disk for larger datasets or persistence
   across triggers


- Scenarios:

Joining Streaming Data with Static Data: Cache static datasets
(e.g., reference tables) to avoid repeated reads for each micro-batch.

   -
   - Reusing Intermediate Results: Cache intermediate dataframes that are
   expensive to compute and used multiple times within the query.
   - Window Operations: Cache data within a window to avoid re-reading for
   subsequent aggregations or calculations within that window.

- Benefits:

   - Performance: Faster query execution by reducing I/O operations and
   computation overhead.
   - Cost Optimization: Reduced reads from external sources can lower
   costs, especially for cloud-based sources.
   - Scalability: Helps handle higher data volumes and throughput by
   minimizing expensive re-computations.


Example codec

scenario 1

static_data = spark.read.load("path/to/static/data") static_data.cache()
streaming_data = spark.readStream.format("...").load() joined_data =
streaming_data.join(static_data, ...) # Static data is cached for efficient
joins

scenario 2

intermediate_df = streaming_data.groupBy(...).count()
intermediate_df.cache()
# Use cached intermediate_df for further transformations or actions

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 10 Jan 2024 at 13:10, Andrzej Zera <andrzejz...@gmail.com> wrote:

> Thank you very much for your suggestions. Yes, my main concern is
> checkpointing costs.
>
> I went through your suggestions and here're my comments:
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> - Optimize Checkpointing Frequency: I'm already using changelog
> checkpointing with RocksDB and increased trigger interval to a maximum
> acceptable value.
>
> - Minimize LIST Request: That's where I can get most savings. My LIST
> requests account for ~70% of checkpointing costs. From what I see, LIST
> requests are ~2.5x the number of PUT requests. Unfortunately, when I
> changed to checkpoting location DBFS, it didn't help with minimizing LIST
> requests. They are roughly at the same level. From what I see, S3 Optimized
> Committer is EMR-specific so I can't use it in Databricks. The fact that I
> don't see a difference between S3 and DBFS checkpoint location suggests
> that both must implement the same or similar committer.
>
> - Optimizing RocksDB: I still need to do this but I don't suspect it will
> help much. From what I understand, these settings shouldn't have a
> significant impact on the number of requests to S3.
>
> Any other ideas how to limit the number of LIST requests are appreciated
>
> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh <mich.talebza...@gmail.com>
> napisał(a):
>
>> OK I assume that your main concern is checkpointing costs.
>>
>> - Caching: If your queries read the same data multiple times, caching
>> the data might reduce the amount of data that needs to be checkpointed.
>>
>> - Optimize Checkpointing Frequency i.e
>>
>>    - Consider Changelog Checkpointing with RocksDB.  This can
>>    potentially reduce checkpoint size and duration by only storing state
>>    changes, rather than the entire state.
>>    - Adjust Trigger Interval (if possible): While not ideal for your
>>    near-real time requirement, even a slight increase in the trigger interval
>>    (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>>
>> - Minimize LIST Requests:
>>
>>    - Enable S3 Optimized Committer: or as you stated consider DBFS
>>
>> You can also optimise RocksDB. Set your state backend to RocksDB, if not
>> already. Here are what I use
>>
>>               # Add RocksDB configurations here
>>         spark.conf.set("spark.sql.streaming.stateStore.providerClass",
>> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
>>
>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true")
>>
>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
>> "64")  # Example configuration
>>
>>  spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
>> "level")
>>
>> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
>> "67108864")
>>
>> These configurations provide a starting point for tuning RocksDB.
>> Depending on your specific use case and requirements, of course, your
>> mileage varies.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera <andrzejz...@gmail.com> wrote:
>>
>>> Usually one or two topics per query. Each query has its own checkpoint
>>> directory. Each topic has a few partitions.
>>>
>>> Performance-wise I don't experience any bottlenecks in terms of
>>> checkpointing. It's all about the number of requests (including a high
>>> number of LIST requests) and the associated cost.
>>>
>>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh <mich.talebza...@gmail.com>
>>> napisał(a):
>>>
>>>> How many topics and checkpoint directories are you dealing with?
>>>>
>>>> Does each topic has its own checkpoint  on S3?
>>>>
>>>> All these checkpoints are sequential writes so even SSD would not
>>>> really help
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera <andrzejz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
>>>>> require near-real time accuracy with trigger intervals in the level of 
>>>>> 5-10
>>>>> seconds. I usually run 3-6 streaming queries as part of the job and each
>>>>> query includes at least one stateful operation (and usually two or more).
>>>>> My checkpoint location is S3 bucket and I use RocksDB as a state store.
>>>>> Unfortunately, checkpointing costs are quite high. It's the main cost item
>>>>> of the system and it's roughly 4-5 times the cost of compute.
>>>>>
>>>>> To save on compute costs, the following things are usually recommended:
>>>>>
>>>>>    - increase trigger interval (as mentioned, I don't have much space
>>>>>    here)
>>>>>    - decrease the number of shuffle partitions (I have 2x the number
>>>>>    of workers)
>>>>>
>>>>> I'm looking for some other recommendations that I can use to save on
>>>>> checkpointing costs. I saw that most requests are LIST requests. Can we 
>>>>> cut
>>>>> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
>>>>> will it help in any way?
>>>>>
>>>>> Thank you!
>>>>> Andrzej
>>>>>
>>>>>

Reply via email to