Thanks Mich ! Ali Gouta.
On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Ali, > > The old saying of one experiment is worth a hundred hypotheses, still > stands. > > As per Test driven approach have a go at it and see what comes out. Forum > members including myself have reported on SSS in Spark user group, so you > are at home on this. > > HTH, > > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 4 Apr 2021 at 17:28, Ali Gouta <ali.go...@gmail.com> wrote: > >> Great, so SSS provides also an api that allows handling RDDs through >> dataFrames using foreachBatch. Still that I am not sure this is a >> good practice in general right ? Well, it depends on the use case in any >> way. >> >> Thank you so much for the hints ! >> >> Best regards, >> Ali Gouta. >> >> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Hi Ali, >>> >>> >>> On a practical side, I have used both the old DStreams and the newer >>> Spark structured streaming (SSS). >>> >>> >>> SSS does a good job at micro-batch level in the form of >>> >>> >>> foreachBatch(SendToSink) >>> >>> >>> "foreach" performs custom write logic on each row and "foreachBatch" >>> *performs >>> custom write logic *on each micro-batch through SendToSink function. >>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as >>> DataFrame or Dataset and second: unique id for each batch. Using >>> foreachBatch, we write each micro batch eventually to storage defined in >>> our custom logic. In this case, we store the output of our streaming >>> application to Redis or Google BigQuery table or any other sink >>> >>> >>> >>> In Dstream world you would have done something like below >>> >>> >>> // Work on every Stream >>> >>> dstream.foreachRDD >>> >>> { pricesRDD => >>> >>> if (!pricesRDD.isEmpty) // data exists in RDD >>> >>> { >>> >>> and after some work from that RDD you would have created a DF (df) >>> >>> With regard to SSS, it allows you to use the passed DataFrame for your >>> work. However, say in my case if you were interested in individual rows of >>> micro-batch (say different collection of prices for different tickers >>> (securities), you could create RDD from the dataframe >>> >>> for row in df.rdd.collect(): >>> ticker = row.ticker >>> price = row.price >>> >>> >>> With regard to foreach(process_row), I have not really tried it as we >>> don't have a use case for it, so I assume your mileage varies as usual. >>> >>> >>> HTH >>> >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta <ali.go...@gmail.com> wrote: >>> >>>> Thank you guys for your answers, I will dig more this new way of doing >>>> things and why not consider leaving the old Dstreams and use instead >>>> structured streaming. Hope that strucrured streaming + spark on Kubernetes >>>> works well and the combination is production ready. >>>> >>>> Best regards, >>>> Ali Gouta. >>>> >>>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a >>>> écrit : >>>> >>>>> Hi, >>>>> >>>>> Just to add it to Gabor's excellent answer that checkpointing and >>>>> offsets are infrastructure-related and should not really be in the hands >>>>> of >>>>> Spark devs who should instead focus on the business purpose of the code >>>>> (not offsets that are very low-level and not really important). >>>>> >>>>> BTW That's what happens in Kafka Streams too >>>>> >>>>> Pozdrawiam, >>>>> Jacek Laskowski >>>>> ---- >>>>> https://about.me/JacekLaskowski >>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>> Follow me on https://twitter.com/jaceklaskowski >>>>> >>>>> <https://twitter.com/jaceklaskowski> >>>>> >>>>> >>>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> There is no way to store offsets in Kafka and restart from the stored >>>>>> offset. Structured Streaming stores offset in checkpoint and it restart >>>>>> from there without any user code. >>>>>> >>>>>> Offsets can be stored with a listener but it can be only used for lag >>>>>> calculation. >>>>>> >>>>>> BR, >>>>>> G >>>>>> >>>>>> >>>>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I was reading the spark docs about spark structured streaming, since >>>>>>> we are thinking about updating our code base that today uses Dstreams, >>>>>>> hence spark streaming. Also, one main reason for this change that we >>>>>>> want >>>>>>> to realize is that reading headers in kafka messages is only supported >>>>>>> in >>>>>>> spark structured streaming and not in Dstreams. >>>>>>> >>>>>>> I was surprised to not see an obvious way to handle manually the >>>>>>> offsets by committing the offsets to kafka. In spark streaming we used >>>>>>> to >>>>>>> do it with something similar to these lines of code: >>>>>>> >>>>>>> stream.foreachRDD { rdd => >>>>>>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>>>>> >>>>>>> // some time later, after outputs have completed >>>>>>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} >>>>>>> >>>>>>> >>>>>>> And this works perfectly ! Especially, this works very nice in case >>>>>>> of job failure/restart... I am wondering how this can be achieved in >>>>>>> spark >>>>>>> structured streaming ? >>>>>>> >>>>>>> I read about checkpoints, and this reminds me the old way of doing >>>>>>> things in spark 1.5/kafka0.8 and is not perfect since we are not >>>>>>> deciding >>>>>>> when to commit offsets by ourselves. >>>>>>> >>>>>>> Did I miss anything ? What would be the best way of committing >>>>>>> offsets to kafka with spark structured streaming to the concerned >>>>>>> consumer >>>>>>> group ? >>>>>>> >>>>>>> Best regards, >>>>>>> Ali Gouta. >>>>>>> >>>>>>