Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.
Best regards, Ali Gouta. Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a écrit : > Hi, > > Just to add it to Gabor's excellent answer that checkpointing and offsets > are infrastructure-related and should not really be in the hands of Spark > devs who should instead focus on the business purpose of the code (not > offsets that are very low-level and not really important). > > BTW That's what happens in Kafka Streams too > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > > > On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> There is no way to store offsets in Kafka and restart from the stored >> offset. Structured Streaming stores offset in checkpoint and it restart >> from there without any user code. >> >> Offsets can be stored with a listener but it can be only used for lag >> calculation. >> >> BR, >> G >> >> >> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote: >> >>> Hello, >>> >>> I was reading the spark docs about spark structured streaming, since we >>> are thinking about updating our code base that today uses Dstreams, hence >>> spark streaming. Also, one main reason for this change that we want to >>> realize is that reading headers in kafka messages is only supported in >>> spark structured streaming and not in Dstreams. >>> >>> I was surprised to not see an obvious way to handle manually the offsets >>> by committing the offsets to kafka. In spark streaming we used to do it >>> with something similar to these lines of code: >>> >>> stream.foreachRDD { rdd => >>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>> >>> // some time later, after outputs have completed >>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} >>> >>> >>> And this works perfectly ! Especially, this works very nice in case of >>> job failure/restart... I am wondering how this can be achieved in spark >>> structured streaming ? >>> >>> I read about checkpoints, and this reminds me the old way of doing >>> things in spark 1.5/kafka0.8 and is not perfect since we are not deciding >>> when to commit offsets by ourselves. >>> >>> Did I miss anything ? What would be the best way of committing offsets >>> to kafka with spark structured streaming to the concerned consumer group ? >>> >>> Best regards, >>> Ali Gouta. >>> >>