Thank you guys for your answers, I will dig more this new way of doing
things and why not consider leaving the old Dstreams and use instead
structured streaming. Hope that strucrured streaming + spark on Kubernetes
works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a écrit :

> Hi,
>
> Just to add it to Gabor's excellent answer that checkpointing and offsets
> are infrastructure-related and should not really be in the hands of Spark
> devs who should instead focus on the business purpose of the code (not
> offsets that are very low-level and not really important).
>
> BTW That's what happens in Kafka Streams too
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> There is no way to store offsets in Kafka and restart from the stored
>> offset. Structured Streaming stores offset in checkpoint and it restart
>> from there without any user code.
>>
>> Offsets can be stored with a listener but it can be only used for lag
>> calculation.
>>
>> BR,
>> G
>>
>>
>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I was reading the spark docs about spark structured streaming, since we
>>> are thinking about updating our code base that today uses Dstreams, hence
>>> spark streaming. Also, one main reason for this change that we want to
>>> realize is that reading headers in kafka messages is only supported in
>>> spark structured streaming and not in Dstreams.
>>>
>>> I was surprised to not see an obvious way to handle manually the offsets
>>> by committing the offsets to kafka. In spark streaming we used to do it
>>> with something similar to these lines of code:
>>>
>>> stream.foreachRDD { rdd =>
>>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>
>>>   // some time later, after outputs have completed
>>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>>
>>>
>>> And this works perfectly ! Especially, this works very nice in case of
>>> job failure/restart... I am wondering how this can be achieved in spark
>>> structured streaming ?
>>>
>>> I read about checkpoints, and this reminds me the old way of doing
>>> things in spark 1.5/kafka0.8 and is not perfect since we are not deciding
>>> when to commit offsets by ourselves.
>>>
>>> Did I miss anything ? What would be the best way of committing offsets
>>> to kafka with spark structured streaming to the concerned consumer group ?
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>

Reply via email to