It appears that structured streaming and Dstream have entirely different
microbatch metadata representation
Can someone assist me in finding the following Dstream microbatch metadata
equivalent in Structured streaming.

1. microbatch timestamp : structured streaming foreachBatch gives batchID
which is not a timestamp. Is there a way to get the microbatch timestamp ?
2. microbatch start event ?
3. scheduling delay of a microbatch ?
4. pending microbatches in case of fixed internal microbatches ?

Thanks

On Wed, May 22, 2024 at 5:23 PM Anil Dasari <adas...@guidewire.com> wrote:

> You are right.
> - another question on migration. Is there a way to get the microbatch id
> during the microbatch dataset `trasform` operation like in rdd transform ?
> I am attempting to implement the following pseudo functionality with
> structured streaming. In this approach, recordCategoriesMetadata is fetched
> and rdd metrics like rdd size etc using microbatch idin the transform
> operation.
> ```code
> val rddQueue = new mutable.Queue[RDD[Int]]()
> // source components
> val sources = Seq.empty[String]
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ssc.queueStream(rddQueue)
> inputStream.transform((rdd, ts) => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> rdd
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .foreachRDD((rdd, ts) => {
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = rdd
> .filter {
> case (source, sourceRDD) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
>
> In structured streaming, it can look like -
>
> ```code
> val consolidatedDstream = sources
> .map(source => {
> val inputStream = ... (for each source)
> inputStream
> }
> )
> .reduceLeft(_ union _)
>
> consolidatedDstream
> .writeStream
> .foreachBatch((ds, ts) => {
> val newDS = ds.transform((internalDS => {
> // emit metrics of microbatch ts : rdd size etc.
>
> val recordCategories = rdd.map(..).collect();
> val recordCategoriesMetadata = ...
> internalDS
> .map(r =>
> val customRecord = CustomRecord(r, SomeMetadataOfRecordSchema)
> (source, customRecord)
> )
> })(... <encoder>)
> // get pipes for each source
> val pipes = Seq.empty[String] // pipes of given source
> pipes.foreach(pipe => {
> val pipeSource = null; // get from pipe variable
> val psRDD = newDS
> .filter {
> case (source, sourceDS) => source.equals(pipeSource)
> }
> // apply pipe transformation and sink
>
> })
> })
> ```
> ^ is just pseudo code and still not sure if it works. Let me know your
> suggestions if any. thanks.
>
> On Wed, May 22, 2024 at 8:34 AM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> The right way to associated microbatches when committing to external
>> storage is to use the microbatch id that you can get in foreachBatch. That
>> microbatch id guarantees that the data produced in the batch is the always
>> the same no matter any recomputations (assuming all processing logic is
>> deterministic). So you can commit the batch id + batch data together. And
>> then async commit the batch id + offsets.
>>
>> On Wed, May 22, 2024 at 11:27 AM Anil Dasari <adas...@guidewire.com>
>> wrote:
>>
>>> Thanks Das, Mtich.
>>>
>>> Mitch,
>>> We process data from Kafka and write it to S3 in Parquet format using
>>> Dstreams. To ensure exactly-once delivery and prevent data loss, our
>>> process records micro-batch offsets to an external storage at the end of
>>> each micro-batch in foreachRDD, which is then used when the job restarts.
>>>
>>> Das,
>>> Thanks for sharing the details. I will look into them.
>>> Unfortunately, the listeners process is async and can't
>>> guarantee happens before association with microbatch to commit offsets to
>>> external storage. But still they will work. Is there a way to access
>>> lastProgress in foreachBatch ?
>>>
>>>
>>> On Wed, May 22, 2024 at 7:35 AM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> If you want to find what offset ranges are present in a microbatch in
>>>> Structured Streaming, you have to look at the
>>>> StreamingQuery.lastProgress or use the QueryProgressListener
>>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
>>>> Both of these approaches gives you access to the SourceProgress
>>>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/SourceProgress.html>
>>>> which gives Kafka offsets as a JSON string.
>>>>
>>>> Hope this helps!
>>>>
>>>> On Wed, May 22, 2024 at 10:04 AM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> OK to understand better your current model relies on streaming data
>>>>> input through Kafka topic, Spark does some ETL and you send to a sink, a
>>>>> database for file storage like HDFS etc?
>>>>>
>>>>> Your current architecture relies on Direct Streams (DStream) and RDDs
>>>>> and you want to move to Spark sStructured Streaming based on dataframes 
>>>>> and
>>>>> datasets?
>>>>>
>>>>> You have not specified your sink
>>>>>
>>>>> With regard to your question?
>>>>>
>>>>> "Is there an equivalent of Dstream HasOffsetRanges in structure
>>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>>> external checkpoint store ?"
>>>>>
>>>>> There is not a direct equivalent of DStream HasOffsetRanges in Spark
>>>>> Structured Streaming. However, Structured Streaming provides mechanisms to
>>>>> achieve similar functionality:
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>> <https://en.everybodywiki.com/Mich_Talebzadeh>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>>>>> Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>
>>>>> )".
>>>>>
>>>>>
>>>>> On Wed, 22 May 2024 at 10:32, ashok34...@yahoo.com.INVALID
>>>>> <ashok34...@yahoo.com.invalid> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> what options are you considering yourself?
>>>>>>
>>>>>> On Wednesday 22 May 2024 at 07:37:30 BST, Anil Dasari <
>>>>>> adas...@guidewire.com> wrote:
>>>>>>
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We are on Spark 3.x and using Spark dstream + kafka and planning to
>>>>>> use structured streaming + Kafka.
>>>>>> Is there an equivalent of Dstream HasOffsetRanges in structure
>>>>>> streaming to get the microbatch end offsets to the checkpoint in our
>>>>>> external checkpoint store ? Thanks in advance.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>

Reply via email to