Hi  Anil,

Ok let us put the complete picture here

       * Current DStreams Setup:*

   - Data Source: Kafka
   - Processing Engine: Spark DStreams
   - Data Transformation with Spark
   - Sink: S3
   - Data Format: Parquet
   - Exactly-Once Delivery (Attempted): You're attempting exactly-once
   delivery by recording micro-batch offsets in an external storage using
   foreachRDD at the end of each micro-batch. This allows you to potentially
   restart the job from the last processed offset in case of failures?
   - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams
   offer limited built-in support for exactly-once delivery guarantees.


*Moving to Spark Structured Streaming: (SSS)*

All stays the same. except below


   - Exactly-Once Delivery which is guaranteed by SSS
   - Checkpointing: Enable checkpointing by setting the checkpointLocation
   option in  writeStream. Spark will periodically checkpoint the state of
   streaming query, including offsets, to a designated location (e.g., HDFS,
   cloud storage or SSD).
   - Offset Tracking with Structured Streaming:: While storing offsets in
   an external storage with DStreams was necessary, SSS handles this
   automatically through checkpointing. The checkpoints include the offsets
   processed by each micro-batch. However, you can still access the most
   recent offsets using the offset() method on your StreamingQuery object for
   monitoring purposes that is if you need it

Have a look at this article of mine  about  structured streaming  and
checkpointing

Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>

In your case briefly

def *store_offsets_to_checkpoint*(df, batchId):
    if(len(df.take(1))) > 0:
         df. persist()
         # Extract offsets from the DataFrame (assuming a column named
'offset')
         offset_rows = df.select(col('offset')).rdd.collect()
         # Create OffsetRange objects from extracted offsets
         offsets = [OffsetRange(partition=row.partition,
fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' and
'offset' columns
                                for row in offset_rows]
         # Logic to store offsets in your external checkpoint store)
          ......
          df.unpersist()
    else:
          print("DataFrame is empty")

# Define your Structured Streaming application with Kafka source and sink

           """
               "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
*store_offsets_to_checkpoint* function
                foreachBatch(*store_offsets_to_checkpoint*) expects 2
parameters, first: micro-batch as DataFrame or Dataset and second: unique
id for each batch
               Using foreachBatch, we write each micro batch to storage
defined in our custom logic
            """

streaming = spark.readStream \
           .format("kafka") \ .
            option("kafka.bootstrap.servers", "localhost:9092") \
          .option("subscribe", "topic_name") \
          .load()

# Custom sink function to store offsets in checkpoint
streaming = streaming.writeStream \
         . format("memory")  \
 *        .option("checkpointLocation", "/path/to/checkpoint/store") \ *
          .foreachBatch(*store_offsets_to_checkpoint*) \
          .start()

HTH

<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/>
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 22 May 2024 at 16:27, Anil Dasari <adas...@guidewire.com> wrote:

> Thanks Das, Mtich.
>
> Mitch,
> We process data from Kafka and write it to S3 in Parquet format using
> Dstreams. To ensure exactly-once delivery and prevent data loss, our
> process records micro-batch offsets to an external storage at the end of
> each micro-batch in foreachRDD, which is then used when the job restarts.
>
> Das,
> Thanks for sharing the details. I will look into them.
> Unfortunately, the listeners process is async and can't guarantee happens
> before association with microbatch to commit offsets to external storage.
> But still they will work. Is there a way to access lastProgress in
> foreachBatch ?
>
>
> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> If you want to find what offset ranges are present in a microbatch in
>> Structured Streaming, you have to look at the StreamingQuery.lastProgress or
>> use the QueryProgressListener
>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>> Both of these approaches gives you access to the SourceProgress
>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>> which gives Kafka offsets as a JSON string.
>>
>> Hope this helps!
>>
>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK to understand better your current model relies on streaming data
>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>> database for file storage like HDFS etc?
>>>
>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>> and you want to move to Spark sStructured Streaming based on dataframes and
>>> datasets?
>>>
>>> You have not specified your sink
>>>
>>> With regard to your question?
>>>
>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>> streaming to get the microbatch end offsets to the checkpoint in our
>>> external checkpoint store ?"
>>>
>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>> achieve similar functionality:
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>> <ashok34...@yahoo.com.invalid> wrote:
>>>
>>>> Hello,
>>>>
>>>> what options are you considering yourself?
>>>>
>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>>> adas...@guidewire.com> wrote:
>>>>
>>>>
>>>> Hello,
>>>>
>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use
>>>> structured streaming + Kafka.
>>>> Is there an equivalent of Dstream HasOffsetRanges in structure
>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>> external checkpoint store ? Thanks in advance.
>>>>
>>>> Regards
>>>>
>>>>

Reply via email to