Thanks Mich !

Ali Gouta.

On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Ali,
>
> The old saying of one experiment is worth a hundred hypotheses, still
> stands.
>
> As per Test driven approach have a go at it and see what comes out. Forum
> members including myself have reported on SSS in Spark user group, so you
> are at home on this.
>
> HTH,
>
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 17:28, Ali Gouta <ali.go...@gmail.com> wrote:
>
>> Great, so SSS provides also an api that allows handling RDDs through
>> dataFrames using foreachBatch. Still that I am not sure this is a
>> good practice in general right ? Well, it depends on the use case in any
>> way.
>>
>> Thank you so much for the hints !
>>
>> Best regards,
>> Ali Gouta.
>>
>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi Ali,
>>>
>>>
>>> On a practical side, I have used both the old DStreams and the newer
>>> Spark structured streaming (SSS).
>>>
>>>
>>> SSS does a good job at micro-batch level in the form of
>>>
>>>
>>> foreachBatch(SendToSink)
>>>
>>>
>>>  "foreach" performs custom write logic on each row and "foreachBatch" 
>>> *performs
>>> custom write logic *on each micro-batch through SendToSink function.
>>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
>>> DataFrame or Dataset and second: unique id for each batch. Using
>>> foreachBatch, we write each micro batch eventually to storage defined in
>>> our custom logic. In this case, we store the output of our streaming
>>> application to Redis or Google BigQuery table or any other sink
>>>
>>>
>>>
>>> In Dstream world you would have done something like below
>>>
>>>
>>>     // Work on every Stream
>>>
>>>     dstream.foreachRDD
>>>
>>>     { pricesRDD =>
>>>
>>>       if (!pricesRDD.isEmpty)  // data exists in RDD
>>>
>>>       {
>>>
>>> and after some work from that RDD you would have created a DF (df)
>>>
>>> With regard to SSS, it allows you to use the passed DataFrame for your
>>> work. However, say in my case if you were interested in individual rows of
>>> micro-batch (say different collection of prices for different tickers
>>> (securities), you could create RDD from the dataframe
>>>
>>> for row in df.rdd.collect():
>>>     ticker = row.ticker
>>>     price = row.price
>>>
>>>
>>> With regard to foreach(process_row), I have not really tried it as we
>>> don't have a use case for it, so I assume your mileage varies as usual.
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta <ali.go...@gmail.com> wrote:
>>>
>>>> Thank you guys for your answers, I will dig more this new way of doing
>>>> things and why not consider leaving the old Dstreams and use instead
>>>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>>>> works well and the combination is production ready.
>>>>
>>>> Best regards,
>>>> Ali Gouta.
>>>>
>>>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <ja...@japila.pl> a
>>>> écrit :
>>>>
>>>>> Hi,
>>>>>
>>>>> Just to add it to Gabor's excellent answer that checkpointing and
>>>>> offsets are infrastructure-related and should not really be in the hands 
>>>>> of
>>>>> Spark devs who should instead focus on the business purpose of the code
>>>>> (not offsets that are very low-level and not really important).
>>>>>
>>>>> BTW That's what happens in Kafka Streams too
>>>>>
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://about.me/JacekLaskowski
>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>
>>>>> <https://twitter.com/jaceklaskowski>
>>>>>
>>>>>
>>>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> There is no way to store offsets in Kafka and restart from the stored
>>>>>> offset. Structured Streaming stores offset in checkpoint and it restart
>>>>>> from there without any user code.
>>>>>>
>>>>>> Offsets can be stored with a listener but it can be only used for lag
>>>>>> calculation.
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta, <ali.go...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I was reading the spark docs about spark structured streaming, since
>>>>>>> we are thinking about updating our code base that today uses Dstreams,
>>>>>>> hence spark streaming. Also, one main reason for this change that we 
>>>>>>> want
>>>>>>> to realize is that reading headers in kafka messages is only supported 
>>>>>>> in
>>>>>>> spark structured streaming and not in Dstreams.
>>>>>>>
>>>>>>> I was surprised to not see an obvious way to handle manually the
>>>>>>> offsets by committing the offsets to kafka. In spark streaming we used 
>>>>>>> to
>>>>>>> do it with something similar to these lines of code:
>>>>>>>
>>>>>>> stream.foreachRDD { rdd =>
>>>>>>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>>>
>>>>>>>   // some time later, after outputs have completed
>>>>>>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>>>>>>
>>>>>>>
>>>>>>> And this works perfectly ! Especially, this works very nice in case
>>>>>>> of job failure/restart... I am wondering how this can be achieved in 
>>>>>>> spark
>>>>>>> structured streaming ?
>>>>>>>
>>>>>>> I read about checkpoints, and this reminds me the old way of doing
>>>>>>> things in spark 1.5/kafka0.8 and is not perfect since we are not 
>>>>>>> deciding
>>>>>>> when to commit offsets by ourselves.
>>>>>>>
>>>>>>> Did I miss anything ? What would be the best way of committing
>>>>>>> offsets to kafka with spark structured streaming to the concerned 
>>>>>>> consumer
>>>>>>> group ?
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Ali Gouta.
>>>>>>>
>>>>>>

Reply via email to