It appears that structured streaming and Dstream have entirely different microbatch metadata representation Can someone assist me in finding the following Dstream microbatch metadata equivalent in Structured streaming.
1. microbatch timestamp : structured streaming foreachBatch gives batchID which is not a timestamp. Is there a way to get the microbatch timestamp ? 2. microbatch start event ? 3. scheduling delay of a microbatch ? 4. pending microbatches in case of fixed internal microbatches ? Thanks On Wed, May 22, 2024 at 5:23 PM Anil Dasari <adas...@guidewire.com> wrote: > You are right. > - another question on migration. Is there a way to get the microbatch id > during the microbatch dataset `trasform` operation like in rdd transform ? > I am attempting to implement the following pseudo functionality with > structured streaming. In this approach, recordCategoriesMetadata is fetched > and rdd metrics like rdd size etc using microbatch idin the transform > operation. > ```code > val rddQueue = new mutable.Queue[RDD[Int]]() > // source components > val sources = Seq.empty[String] > val consolidatedDstream = sources > .map(source => { > val inputStream = ssc.queueStream(rddQueue) > inputStream.transform((rdd, ts) => { > // emit metrics of microbatch ts : rdd size etc. > > val recordCategories = rdd.map(..).collect(); > val recordCategoriesMetadata = ... > rdd > .map(r => > val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema) > (source, customRecord) > ) > }) > } > ) > .reduceLeft(_ union _) > > consolidatedDstream > .foreachRDD((rdd, ts) => { > // get pipes for each source > val pipes = Seq.empty[String] // pipes of given source > pipes.foreach(pipe => { > val pipeSource = null; // get from pipe variable > val psRDD = rdd > .filter { > case (source, sourceRDD) => source.equals(pipeSource) > } > // apply pipe transformation and sink > > }) > }) > ``` > > In structured streaming, it can look like - > > ```code > val consolidatedDstream = sources > .map(source => { > val inputStream = ... (for each source) > inputStream > } > ) > .reduceLeft(_ union _) > > consolidatedDstream > .writeStream > .foreachBatch((ds, ts) => { > val newDS = ds.transform((internalDS => { > // emit metrics of microbatch ts : rdd size etc. > > val recordCategories = rdd.map(..).collect(); > val recordCategoriesMetadata = ... > internalDS > .map(r => > val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema) > (source, customRecord) > ) > })(... <encoder>) > // get pipes for each source > val pipes = Seq.empty[String] // pipes of given source > pipes.foreach(pipe => { > val pipeSource = null; // get from pipe variable > val psRDD = newDS > .filter { > case (source, sourceDS) => source.equals(pipeSource) > } > // apply pipe transformation and sink > > }) > }) > ``` > ^ is just pseudo code and still not sure if it works. Let me know your > suggestions if any. thanks. > > On Wed, May 22, 2024 at 8:34 AM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> The right way to associated microbatches when committing to external >> storage is to use the microbatch id that you can get in foreachBatch. That >> microbatch id guarantees that the data produced in the batch is the always >> the same no matter any recomputations (assuming all processing logic is >> deterministic). So you can commit the batch id + batch data together. And >> then async commit the batch id + offsets. >> >> On Wed, May 22, 2024 at 11:27 AM Anil Dasari <adas...@guidewire.com> >> wrote: >> >>> Thanks Das, Mtich. >>> >>> Mitch, >>> We process data from Kafka and write it to S3 in Parquet format using >>> Dstreams. To ensure exactly-once delivery and prevent data loss, our >>> process records micro-batch offsets to an external storage at the end of >>> each micro-batch in foreachRDD, which is then used when the job restarts. >>> >>> Das, >>> Thanks for sharing the details. I will look into them. >>> Unfortunately, the listeners process is async and can't >>> guarantee happens before association with microbatch to commit offsets to >>> external storage. But still they will work. Is there a way to access >>> lastProgress in foreachBatch ? >>> >>> >>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> If you want to find what offset ranges are present in a microbatch in >>>> Structured Streaming, you have to look at the >>>> StreamingQuery.lastProgress or use the QueryProgressListener >>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>. >>>> Both of these approaches gives you access to the SourceProgress >>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html> >>>> which gives Kafka offsets as a JSON string. >>>> >>>> Hope this helps! >>>> >>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> OK to understand better your current model relies on streaming data >>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a >>>>> database for file storage like HDFS etc? >>>>> >>>>> Your current architecture relies on Direct Streams (DStream) and RDDs >>>>> and you want to move to Spark sStructured Streaming based on dataframes >>>>> and >>>>> datasets? >>>>> >>>>> You have not specified your sink >>>>> >>>>> With regard to your question? >>>>> >>>>> "Is there an equivalent of Dstream HasOffsetRanges in structure >>>>> streaming to get the microbatch end offsets to the checkpoint in our >>>>> external checkpoint store ?" >>>>> >>>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark >>>>> Structured Streaming. However, Structured Streaming provides mechanisms to >>>>> achieve similar functionality: >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> <https://en.everybodywiki.com/Mich_Talebzadeh> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* The information provided is correct to the best of my >>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>> expert opinions (Werner >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >>>>> Braun >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun> >>>>> )". >>>>> >>>>> >>>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID >>>>> <ashok34...@yahoo.com.invalid> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> what options are you considering yourself? >>>>>> >>>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari < >>>>>> adas...@guidewire.com> wrote: >>>>>> >>>>>> >>>>>> Hello, >>>>>> >>>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to >>>>>> use structured streaming + Kafka. >>>>>> Is there an equivalent of Dstream HasOffsetRanges in structure >>>>>> streaming to get the microbatch end offsets to the checkpoint in our >>>>>> external checkpoint store ? Thanks in advance. >>>>>> >>>>>> Regards >>>>>> >>>>>>