There is no way to store offsets in Kafka and restart from the stored
offset. Structured Streaming stores offset in checkpoint and it restart
from there without any user code.

Offsets can be stored with a listener but it can be only used for lag
calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote:

> Hello,
>
> I was reading the spark docs about spark structured streaming, since we
> are thinking about updating our code base that today uses Dstreams, hence
> spark streaming. Also, one main reason for this change that we want to
> realize is that reading headers in kafka messages is only supported in
> spark structured streaming and not in Dstreams.
>
> I was surprised to not see an obvious way to handle manually the offsets
> by committing the offsets to kafka. In spark streaming we used to do it
> with something similar to these lines of code:
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> And this works perfectly ! Especially, this works very nice in case of job
> failure/restart... I am wondering how this can be achieved in spark
> structured streaming ?
>
> I read about checkpoints, and this reminds me the old way of doing things
> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to
> commit offsets by ourselves.
>
> Did I miss anything ? What would be the best way of committing offsets to
> kafka with spark structured streaming to the concerned consumer group ?
>
> Best regards,
> Ali Gouta.
>

Reply via email to