Great, so SSS provides also an api that allows handling RDDs through
dataFrames using foreachBatch. Still that I am not sure this is a
good practice in general right ? Well, it depends on the use case in any
way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Ali,
>
>
> On a practical side, I have used both the old DStreams and the newer Spark
> structured streaming (SSS).
>
>
> SSS does a good job at micro-batch level in the form of
>
>
> foreachBatch(SendToSink)
>
>
>  "foreach" performs custom write logic on each row and "foreachBatch" 
> *performs
> custom write logic *on each micro-batch through SendToSink function.
> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
> DataFrame or Dataset and second: unique id for each batch. Using
> foreachBatch, we write each micro batch eventually to storage defined in
> our custom logic. In this case, we store the output of our streaming
> application to Redis or Google BigQuery table or any other sink
>
>
>
> In Dstream world you would have done something like below
>
>
>     // Work on every Stream
>
>     dstream.foreachRDD
>
>     { pricesRDD =>
>
>       if (!pricesRDD.isEmpty)  // data exists in RDD
>
>       {
>
> and after some work from that RDD you would have created a DF (df)
>
> With regard to SSS, it allows you to use the passed DataFrame for your
> work. However, say in my case if you were interested in individual rows of
> micro-batch (say different collection of prices for different tickers
> (securities), you could create RDD from the dataframe
>
> for row in df.rdd.collect():
>     ticker = row.ticker
>     price = row.price
>
>
> With regard to foreach(process_row), I have not really tried it as we
> don't have a use case for it, so I assume your mileage varies as usual.
>
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 16:27, Ali Gouta <ali.go...@gmail.com> wrote:
>
>> Thank you guys for your answers, I will dig more this new way of doing
>> things and why not consider leaving the old Dstreams and use instead
>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>> works well and the combination is production ready.
>>
>> Best regards,
>> Ali Gouta.
>>
>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a écrit :
>>
>>> Hi,
>>>
>>> Just to add it to Gabor's excellent answer that checkpointing and
>>> offsets are infrastructure-related and should not really be in the hands of
>>> Spark devs who should instead focus on the business purpose of the code
>>> (not offsets that are very low-level and not really important).
>>>
>>> BTW That's what happens in Kafka Streams too
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> There is no way to store offsets in Kafka and restart from the stored
>>>> offset. Structured Streaming stores offset in checkpoint and it restart
>>>> from there without any user code.
>>>>
>>>> Offsets can be stored with a listener but it can be only used for lag
>>>> calculation.
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I was reading the spark docs about spark structured streaming, since
>>>>> we are thinking about updating our code base that today uses Dstreams,
>>>>> hence spark streaming. Also, one main reason for this change that we want
>>>>> to realize is that reading headers in kafka messages is only supported in
>>>>> spark structured streaming and not in Dstreams.
>>>>>
>>>>> I was surprised to not see an obvious way to handle manually the
>>>>> offsets by committing the offsets to kafka. In spark streaming we used to
>>>>> do it with something similar to these lines of code:
>>>>>
>>>>> stream.foreachRDD { rdd =>
>>>>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>
>>>>>   // some time later, after outputs have completed
>>>>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>>>>
>>>>>
>>>>> And this works perfectly ! Especially, this works very nice in case of
>>>>> job failure/restart... I am wondering how this can be achieved in spark
>>>>> structured streaming ?
>>>>>
>>>>> I read about checkpoints, and this reminds me the old way of doing
>>>>> things in spark 1.5/kafka0.8 and is not perfect since we are not deciding
>>>>> when to commit offsets by ourselves.
>>>>>
>>>>> Did I miss anything ? What would be the best way of committing offsets
>>>>> to kafka with spark structured streaming to the concerned consumer group ?
>>>>>
>>>>> Best regards,
>>>>> Ali Gouta.
>>>>>
>>>>

Reply via email to