Hi Anil, Ok let us put the complete picture here
* Current DStreams Setup:* - Data Source: Kafka - Processing Engine: Spark DStreams - Data Transformation with Spark - Sink: S3 - Data Format: Parquet - Exactly-Once Delivery (Attempted): You're attempting exactly-once delivery by recording micro-batch offsets in an external storage using foreachRDD at the end of each micro-batch. This allows you to potentially restart the job from the last processed offset in case of failures? - Challenges with DStreams for Exactly-Once Delivery: Spark DStreams offer limited built-in support for exactly-once delivery guarantees. *Moving to Spark Structured Streaming: (SSS)* All stays the same. except below - Exactly-Once Delivery which is guaranteed by SSS - Checkpointing: Enable checkpointing by setting the checkpointLocation option in writeStream. Spark will periodically checkpoint the state of streaming query, including offsets, to a designated location (e.g., HDFS, cloud storage or SSD). - Offset Tracking with Structured Streaming:: While storing offsets in an external storage with DStreams was necessary, SSS handles this automatically through checkpointing. The checkpoints include the offsets processed by each micro-batch. However, you can still access the most recent offsets using the offset() method on your StreamingQuery object for monitoring purposes that is if you need it Have a look at this article of mine about structured streaming and checkpointing Processing Change Data Capture with Spark Structured Streaming <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> In your case briefly def *store_offsets_to_checkpoint*(df, batchId): if(len(df.take(1))) > 0: df. persist() # Extract offsets from the DataFrame (assuming a column named 'offset') offset_rows = df.select(col('offset')).rdd.collect() # Create OffsetRange objects from extracted offsets offsets = [OffsetRange(partition=row.partition, fromOffset=row.offset, toOffset=row.offset + 1) # Assuming 'partition' and 'offset' columns for row in offset_rows] # Logic to store offsets in your external checkpoint store) ...... df.unpersist() else: print("DataFrame is empty") # Define your Structured Streaming application with Kafka source and sink """ "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through *store_offsets_to_checkpoint* function foreachBatch(*store_offsets_to_checkpoint*) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch Using foreachBatch, we write each micro batch to storage defined in our custom logic """ streaming = spark.readStream \ .format("kafka") \ . option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "topic_name") \ .load() # Custom sink function to store offsets in checkpoint streaming = streaming.writeStream \ . format("memory") \ * .option("checkpointLocation", "/path/to/checkpoint/store") \ * .foreachBatch(*store_offsets_to_checkpoint*) \ .start() HTH <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/> Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 22 May 2024 at 16:27, Anil Dasari <adas...@guidewire.com> wrote: > Thanks Das, Mtich. > > Mitch, > We process data from Kafka and write it to S3 in Parquet format using > Dstreams. To ensure exactly-once delivery and prevent data loss, our > process records micro-batch offsets to an external storage at the end of > each micro-batch in foreachRDD, which is then used when the job restarts. > > Das, > Thanks for sharing the details. I will look into them. > Unfortunately, the listeners process is async and can't guarantee happens > before association with microbatch to commit offsets to external storage. > But still they will work. Is there a way to access lastProgress in > foreachBatch ? > > > On Wed, May 22, 2024 at 7:35 AM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> If you want to find what offset ranges are present in a microbatch in >> Structured Streaming, you have to look at the StreamingQuery.lastProgress or >> use the QueryProgressListener >> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>. >> Both of these approaches gives you access to the SourceProgress >> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html> >> which gives Kafka offsets as a JSON string. >> >> Hope this helps! >> >> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> OK to understand better your current model relies on streaming data >>> input through Kafka topic, Spark does some ETL and you send to a sink, a >>> database for file storage like HDFS etc? >>> >>> Your current architecture relies on Direct Streams (DStream) and RDDs >>> and you want to move to Spark sStructured Streaming based on dataframes and >>> datasets? >>> >>> You have not specified your sink >>> >>> With regard to your question? >>> >>> "Is there an equivalent of Dstream HasOffsetRanges in structure >>> streaming to get the microbatch end offsets to the checkpoint in our >>> external checkpoint store ?" >>> >>> There is not a direct equivalent of DStream HasOffsetRanges in Spark >>> Structured Streaming. However, Structured Streaming provides mechanisms to >>> achieve similar functionality: >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID >>> <ashok34...@yahoo.com.invalid> wrote: >>> >>>> Hello, >>>> >>>> what options are you considering yourself? >>>> >>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari < >>>> adas...@guidewire.com> wrote: >>>> >>>> >>>> Hello, >>>> >>>> We are on Spark 3.x and using Spark dstream + kafka and planning to use >>>> structured streaming + Kafka. >>>> Is there an equivalent of Dstream HasOffsetRanges in structure >>>> streaming to get the microbatch end offsets to the checkpoint in our >>>> external checkpoint store ? Thanks in advance. >>>> >>>> Regards >>>> >>>>