Great, so SSS provides also an api that allows handling RDDs through dataFrames using foreachBatch. Still that I am not sure this is a good practice in general right ? Well, it depends on the use case in any way.
Thank you so much for the hints ! Best regards, Ali Gouta. On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Ali, > > > On a practical side, I have used both the old DStreams and the newer Spark > structured streaming (SSS). > > > SSS does a good job at micro-batch level in the form of > > > foreachBatch(SendToSink) > > > "foreach" performs custom write logic on each row and "foreachBatch" > *performs > custom write logic *on each micro-batch through SendToSink function. > foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as > DataFrame or Dataset and second: unique id for each batch. Using > foreachBatch, we write each micro batch eventually to storage defined in > our custom logic. In this case, we store the output of our streaming > application to Redis or Google BigQuery table or any other sink > > > > In Dstream world you would have done something like below > > > // Work on every Stream > > dstream.foreachRDD > > { pricesRDD => > > if (!pricesRDD.isEmpty) // data exists in RDD > > { > > and after some work from that RDD you would have created a DF (df) > > With regard to SSS, it allows you to use the passed DataFrame for your > work. However, say in my case if you were interested in individual rows of > micro-batch (say different collection of prices for different tickers > (securities), you could create RDD from the dataframe > > for row in df.rdd.collect(): > ticker = row.ticker > price = row.price > > > With regard to foreach(process_row), I have not really tried it as we > don't have a use case for it, so I assume your mileage varies as usual. > > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 4 Apr 2021 at 16:27, Ali Gouta <ali.go...@gmail.com> wrote: > >> Thank you guys for your answers, I will dig more this new way of doing >> things and why not consider leaving the old Dstreams and use instead >> structured streaming. Hope that strucrured streaming + spark on Kubernetes >> works well and the combination is production ready. >> >> Best regards, >> Ali Gouta. >> >> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a écrit : >> >>> Hi, >>> >>> Just to add it to Gabor's excellent answer that checkpointing and >>> offsets are infrastructure-related and should not really be in the hands of >>> Spark devs who should instead focus on the business purpose of the code >>> (not offsets that are very low-level and not really important). >>> >>> BTW That's what happens in Kafka Streams too >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://about.me/JacekLaskowski >>> "The Internals Of" Online Books <https://books.japila.pl/> >>> Follow me on https://twitter.com/jaceklaskowski >>> >>> <https://twitter.com/jaceklaskowski> >>> >>> >>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> There is no way to store offsets in Kafka and restart from the stored >>>> offset. Structured Streaming stores offset in checkpoint and it restart >>>> from there without any user code. >>>> >>>> Offsets can be stored with a listener but it can be only used for lag >>>> calculation. >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> I was reading the spark docs about spark structured streaming, since >>>>> we are thinking about updating our code base that today uses Dstreams, >>>>> hence spark streaming. Also, one main reason for this change that we want >>>>> to realize is that reading headers in kafka messages is only supported in >>>>> spark structured streaming and not in Dstreams. >>>>> >>>>> I was surprised to not see an obvious way to handle manually the >>>>> offsets by committing the offsets to kafka. In spark streaming we used to >>>>> do it with something similar to these lines of code: >>>>> >>>>> stream.foreachRDD { rdd => >>>>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>>> >>>>> // some time later, after outputs have completed >>>>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} >>>>> >>>>> >>>>> And this works perfectly ! Especially, this works very nice in case of >>>>> job failure/restart... I am wondering how this can be achieved in spark >>>>> structured streaming ? >>>>> >>>>> I read about checkpoints, and this reminds me the old way of doing >>>>> things in spark 1.5/kafka0.8 and is not perfect since we are not deciding >>>>> when to commit offsets by ourselves. >>>>> >>>>> Did I miss anything ? What would be the best way of committing offsets >>>>> to kafka with spark structured streaming to the concerned consumer group ? >>>>> >>>>> Best regards, >>>>> Ali Gouta. >>>>> >>>>