Ordering pushdown for Spark Datasources

2021-04-04 Thread Kohki Nishio
Hello,

I'm trying to use Spark SQL as a log analytics solution. As you might
guess, for most use-cases, data is ordered by timestamp and the amount of
data is large.

If I want to show the first 100 entries (ordered by timestamp) for a given
condition, Spark Executor has to scan the whole entries to select the
top 100 by timestamp.

I understand this behavior, however, some of the data sources such as JDBC
or Lucene can support ordering and in this case, the target data is large
(a couple of millions). I believe it is possible to pushdown orderings to
the data sources and make the executors return early.

Here's my ask, I know Spark doesn't do such a thing... but I'm looking for
any pointers, references which might be relevant to this, or .. any random
idea would be appreciated. So far I found, some folks are working on
aggregation pushdown (SPARK-22390), but I don't see any current activity
for ordering pushdown.

Thanks


-- 
Kohki Nishio


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Gabor Somogyi
Just to be crystal clear Dstreams will be deprecated sooner or later and
there will be no support so highly advised to migrate...

G


On Sun, 4 Apr 2021, 19:23 Ali Gouta,  wrote:

> Thanks Mich !
>
> Ali Gouta.
>
> On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh 
> wrote:
>
>> Hi Ali,
>>
>> The old saying of one experiment is worth a hundred hypotheses, still
>> stands.
>>
>> As per Test driven approach have a go at it and see what comes out. Forum
>> members including myself have reported on SSS in Spark user group, so you
>> are at home on this.
>>
>> HTH,
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 4 Apr 2021 at 17:28, Ali Gouta  wrote:
>>
>>> Great, so SSS provides also an api that allows handling RDDs through
>>> dataFrames using foreachBatch. Still that I am not sure this is a
>>> good practice in general right ? Well, it depends on the use case in any
>>> way.
>>>
>>> Thank you so much for the hints !
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi Ali,


 On a practical side, I have used both the old DStreams and the newer
 Spark structured streaming (SSS).


 SSS does a good job at micro-batch level in the form of


 foreachBatch(SendToSink)


  "foreach" performs custom write logic on each row and "foreachBatch" 
 *performs
 custom write logic *on each micro-batch through SendToSink function.
 foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
 DataFrame or Dataset and second: unique id for each batch. Using
 foreachBatch, we write each micro batch eventually to storage defined in
 our custom logic. In this case, we store the output of our streaming
 application to Redis or Google BigQuery table or any other sink



 In Dstream world you would have done something like below


 // Work on every Stream

 dstream.foreachRDD

 { pricesRDD =>

   if (!pricesRDD.isEmpty)  // data exists in RDD

   {

 and after some work from that RDD you would have created a DF (df)

 With regard to SSS, it allows you to use the passed DataFrame for your
 work. However, say in my case if you were interested in individual rows of
 micro-batch (say different collection of prices for different tickers
 (securities), you could create RDD from the dataframe

 for row in df.rdd.collect():
 ticker = row.ticker
 price = row.price


 With regard to foreach(process_row), I have not really tried it as we
 don't have a use case for it, so I assume your mileage varies as usual.


 HTH



view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:

> Thank you guys for your answers, I will dig more this new way of doing
> things and why not consider leaving the old Dstreams and use instead
> structured streaming. Hope that strucrured streaming + spark on Kubernetes
> works well and the combination is production ready.
>
> Best regards,
> Ali Gouta.
>
> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a
> écrit :
>
>> Hi,
>>
>> Just to add it to Gabor's excellent answer that checkpointing and
>> offsets are infrastructure-related and should not really be in the hands 
>> of
>> Spark devs who should instead focus on the business purpose of the code
>> (not offsets that are very low-level and not really important).
>>
>> BTW That's what happens in Kafka Streams too
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
>> gabor.g.somo...@gmail.com> wrote:
>>
>>> There is no way to store offsets in Kafka 

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Ali Gouta
Thanks Mich !

Ali Gouta.

On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh 
wrote:

> Hi Ali,
>
> The old saying of one experiment is worth a hundred hypotheses, still
> stands.
>
> As per Test driven approach have a go at it and see what comes out. Forum
> members including myself have reported on SSS in Spark user group, so you
> are at home on this.
>
> HTH,
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 17:28, Ali Gouta  wrote:
>
>> Great, so SSS provides also an api that allows handling RDDs through
>> dataFrames using foreachBatch. Still that I am not sure this is a
>> good practice in general right ? Well, it depends on the use case in any
>> way.
>>
>> Thank you so much for the hints !
>>
>> Best regards,
>> Ali Gouta.
>>
>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi Ali,
>>>
>>>
>>> On a practical side, I have used both the old DStreams and the newer
>>> Spark structured streaming (SSS).
>>>
>>>
>>> SSS does a good job at micro-batch level in the form of
>>>
>>>
>>> foreachBatch(SendToSink)
>>>
>>>
>>>  "foreach" performs custom write logic on each row and "foreachBatch" 
>>> *performs
>>> custom write logic *on each micro-batch through SendToSink function.
>>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
>>> DataFrame or Dataset and second: unique id for each batch. Using
>>> foreachBatch, we write each micro batch eventually to storage defined in
>>> our custom logic. In this case, we store the output of our streaming
>>> application to Redis or Google BigQuery table or any other sink
>>>
>>>
>>>
>>> In Dstream world you would have done something like below
>>>
>>>
>>> // Work on every Stream
>>>
>>> dstream.foreachRDD
>>>
>>> { pricesRDD =>
>>>
>>>   if (!pricesRDD.isEmpty)  // data exists in RDD
>>>
>>>   {
>>>
>>> and after some work from that RDD you would have created a DF (df)
>>>
>>> With regard to SSS, it allows you to use the passed DataFrame for your
>>> work. However, say in my case if you were interested in individual rows of
>>> micro-batch (say different collection of prices for different tickers
>>> (securities), you could create RDD from the dataframe
>>>
>>> for row in df.rdd.collect():
>>> ticker = row.ticker
>>> price = row.price
>>>
>>>
>>> With regard to foreach(process_row), I have not really tried it as we
>>> don't have a use case for it, so I assume your mileage varies as usual.
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:
>>>
 Thank you guys for your answers, I will dig more this new way of doing
 things and why not consider leaving the old Dstreams and use instead
 structured streaming. Hope that strucrured streaming + spark on Kubernetes
 works well and the combination is production ready.

 Best regards,
 Ali Gouta.

 Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a
 écrit :

> Hi,
>
> Just to add it to Gabor's excellent answer that checkpointing and
> offsets are infrastructure-related and should not really be in the hands 
> of
> Spark devs who should instead focus on the business purpose of the code
> (not offsets that are very low-level and not really important).
>
> BTW That's what happens in Kafka Streams too
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com> wrote:
>
>> There is no way to store offsets in Kafka and restart from the stored
>> offset. Structured Streaming stores offset in checkpoint and it restart
>> from there without any user code.
>>
>> Offsets can be stored with a listener but it can be only used for lag
>> calculation.
>>
>> BR,
>> G
>>
>>
>> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>>
>>> Hello,

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Mich Talebzadeh
Hi Ali,

The old saying of one experiment is worth a hundred hypotheses, still
stands.

As per Test driven approach have a go at it and see what comes out. Forum
members including myself have reported on SSS in Spark user group, so you
are at home on this.

HTH,




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 4 Apr 2021 at 17:28, Ali Gouta  wrote:

> Great, so SSS provides also an api that allows handling RDDs through
> dataFrames using foreachBatch. Still that I am not sure this is a
> good practice in general right ? Well, it depends on the use case in any
> way.
>
> Thank you so much for the hints !
>
> Best regards,
> Ali Gouta.
>
> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh 
> wrote:
>
>> Hi Ali,
>>
>>
>> On a practical side, I have used both the old DStreams and the newer
>> Spark structured streaming (SSS).
>>
>>
>> SSS does a good job at micro-batch level in the form of
>>
>>
>> foreachBatch(SendToSink)
>>
>>
>>  "foreach" performs custom write logic on each row and "foreachBatch" 
>> *performs
>> custom write logic *on each micro-batch through SendToSink function.
>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
>> DataFrame or Dataset and second: unique id for each batch. Using
>> foreachBatch, we write each micro batch eventually to storage defined in
>> our custom logic. In this case, we store the output of our streaming
>> application to Redis or Google BigQuery table or any other sink
>>
>>
>>
>> In Dstream world you would have done something like below
>>
>>
>> // Work on every Stream
>>
>> dstream.foreachRDD
>>
>> { pricesRDD =>
>>
>>   if (!pricesRDD.isEmpty)  // data exists in RDD
>>
>>   {
>>
>> and after some work from that RDD you would have created a DF (df)
>>
>> With regard to SSS, it allows you to use the passed DataFrame for your
>> work. However, say in my case if you were interested in individual rows of
>> micro-batch (say different collection of prices for different tickers
>> (securities), you could create RDD from the dataframe
>>
>> for row in df.rdd.collect():
>> ticker = row.ticker
>> price = row.price
>>
>>
>> With regard to foreach(process_row), I have not really tried it as we
>> don't have a use case for it, so I assume your mileage varies as usual.
>>
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:
>>
>>> Thank you guys for your answers, I will dig more this new way of doing
>>> things and why not consider leaving the old Dstreams and use instead
>>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>>> works well and the combination is production ready.
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :
>>>
 Hi,

 Just to add it to Gabor's excellent answer that checkpointing and
 offsets are infrastructure-related and should not really be in the hands of
 Spark devs who should instead focus on the business purpose of the code
 (not offsets that are very low-level and not really important).

 BTW That's what happens in Kafka Streams too

 Pozdrawiam,
 Jacek Laskowski
 
 https://about.me/JacekLaskowski
 "The Internals Of" Online Books 
 Follow me on https://twitter.com/jaceklaskowski

 


 On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
 gabor.g.somo...@gmail.com> wrote:

> There is no way to store offsets in Kafka and restart from the stored
> offset. Structured Streaming stores offset in checkpoint and it restart
> from there without any user code.
>
> Offsets can be stored with a listener but it can be only used for lag
> calculation.
>
> BR,
> G
>
>
> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>
>> Hello,
>>
>> I was reading the spark docs about spark structured streaming, since
>> we are thinking about updating our code base that today uses Dstreams,
>> hence spark streaming. Also, one main reason for this change that we want
>> to realize is that reading 

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Ali Gouta
Great, so SSS provides also an api that allows handling RDDs through
dataFrames using foreachBatch. Still that I am not sure this is a
good practice in general right ? Well, it depends on the use case in any
way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh 
wrote:

> Hi Ali,
>
>
> On a practical side, I have used both the old DStreams and the newer Spark
> structured streaming (SSS).
>
>
> SSS does a good job at micro-batch level in the form of
>
>
> foreachBatch(SendToSink)
>
>
>  "foreach" performs custom write logic on each row and "foreachBatch" 
> *performs
> custom write logic *on each micro-batch through SendToSink function.
> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
> DataFrame or Dataset and second: unique id for each batch. Using
> foreachBatch, we write each micro batch eventually to storage defined in
> our custom logic. In this case, we store the output of our streaming
> application to Redis or Google BigQuery table or any other sink
>
>
>
> In Dstream world you would have done something like below
>
>
> // Work on every Stream
>
> dstream.foreachRDD
>
> { pricesRDD =>
>
>   if (!pricesRDD.isEmpty)  // data exists in RDD
>
>   {
>
> and after some work from that RDD you would have created a DF (df)
>
> With regard to SSS, it allows you to use the passed DataFrame for your
> work. However, say in my case if you were interested in individual rows of
> micro-batch (say different collection of prices for different tickers
> (securities), you could create RDD from the dataframe
>
> for row in df.rdd.collect():
> ticker = row.ticker
> price = row.price
>
>
> With regard to foreach(process_row), I have not really tried it as we
> don't have a use case for it, so I assume your mileage varies as usual.
>
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:
>
>> Thank you guys for your answers, I will dig more this new way of doing
>> things and why not consider leaving the old Dstreams and use instead
>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>> works well and the combination is production ready.
>>
>> Best regards,
>> Ali Gouta.
>>
>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :
>>
>>> Hi,
>>>
>>> Just to add it to Gabor's excellent answer that checkpointing and
>>> offsets are infrastructure-related and should not really be in the hands of
>>> Spark devs who should instead focus on the business purpose of the code
>>> (not offsets that are very low-level and not really important).
>>>
>>> BTW That's what happens in Kafka Streams too
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books 
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> 
>>>
>>>
>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
>>> wrote:
>>>
 There is no way to store offsets in Kafka and restart from the stored
 offset. Structured Streaming stores offset in checkpoint and it restart
 from there without any user code.

 Offsets can be stored with a listener but it can be only used for lag
 calculation.

 BR,
 G


 On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:

> Hello,
>
> I was reading the spark docs about spark structured streaming, since
> we are thinking about updating our code base that today uses Dstreams,
> hence spark streaming. Also, one main reason for this change that we want
> to realize is that reading headers in kafka messages is only supported in
> spark structured streaming and not in Dstreams.
>
> I was surprised to not see an obvious way to handle manually the
> offsets by committing the offsets to kafka. In spark streaming we used to
> do it with something similar to these lines of code:
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> And this works perfectly ! Especially, this works very nice in case of
> job failure/restart... I am wondering how this can be achieved in spark
> structured streaming ?
>
> I read about checkpoints, and this reminds me the old way of doing
> things in spark 1.5/kafka0.8 and is not perfect since we are not 

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Mich Talebzadeh
Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark
structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" *performs
custom write logic *on each micro-batch through SendToSink function.
foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
DataFrame or Dataset and second: unique id for each batch. Using
foreachBatch, we write each micro batch eventually to storage defined in
our custom logic. In this case, we store the output of our streaming
application to Redis or Google BigQuery table or any other sink



In Dstream world you would have done something like below


// Work on every Stream

dstream.foreachRDD

{ pricesRDD =>

  if (!pricesRDD.isEmpty)  // data exists in RDD

  {

and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your
work. However, say in my case if you were interested in individual rows of
micro-batch (say different collection of prices for different tickers
(securities), you could create RDD from the dataframe

for row in df.rdd.collect():
ticker = row.ticker
price = row.price


With regard to foreach(process_row), I have not really tried it as we don't
have a use case for it, so I assume your mileage varies as usual.


HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:

> Thank you guys for your answers, I will dig more this new way of doing
> things and why not consider leaving the old Dstreams and use instead
> structured streaming. Hope that strucrured streaming + spark on Kubernetes
> works well and the combination is production ready.
>
> Best regards,
> Ali Gouta.
>
> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :
>
>> Hi,
>>
>> Just to add it to Gabor's excellent answer that checkpointing and offsets
>> are infrastructure-related and should not really be in the hands of Spark
>> devs who should instead focus on the business purpose of the code (not
>> offsets that are very low-level and not really important).
>>
>> BTW That's what happens in Kafka Streams too
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
>> wrote:
>>
>>> There is no way to store offsets in Kafka and restart from the stored
>>> offset. Structured Streaming stores offset in checkpoint and it restart
>>> from there without any user code.
>>>
>>> Offsets can be stored with a listener but it can be only used for lag
>>> calculation.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>>>
 Hello,

 I was reading the spark docs about spark structured streaming, since we
 are thinking about updating our code base that today uses Dstreams, hence
 spark streaming. Also, one main reason for this change that we want to
 realize is that reading headers in kafka messages is only supported in
 spark structured streaming and not in Dstreams.

 I was surprised to not see an obvious way to handle manually the
 offsets by committing the offsets to kafka. In spark streaming we used to
 do it with something similar to these lines of code:

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

   // some time later, after outputs have completed
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}


 And this works perfectly ! Especially, this works very nice in case of
 job failure/restart... I am wondering how this can be achieved in spark
 structured streaming ?

 I read about checkpoints, and this reminds me the old way of doing
 things in spark 1.5/kafka0.8 and is not perfect since we are not deciding
 when to commit offsets by ourselves.

 Did I miss anything ? What would be the best way of committing offsets
 to kafka with spark structured streaming to the concerned consumer group ?

 Best regards,
 Ali Gouta.

>>>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Ali Gouta
Thank you guys for your answers, I will dig more this new way of doing
things and why not consider leaving the old Dstreams and use instead
structured streaming. Hope that strucrured streaming + spark on Kubernetes
works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :

> Hi,
>
> Just to add it to Gabor's excellent answer that checkpointing and offsets
> are infrastructure-related and should not really be in the hands of Spark
> devs who should instead focus on the business purpose of the code (not
> offsets that are very low-level and not really important).
>
> BTW That's what happens in Kafka Streams too
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
> wrote:
>
>> There is no way to store offsets in Kafka and restart from the stored
>> offset. Structured Streaming stores offset in checkpoint and it restart
>> from there without any user code.
>>
>> Offsets can be stored with a listener but it can be only used for lag
>> calculation.
>>
>> BR,
>> G
>>
>>
>> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>>
>>> Hello,
>>>
>>> I was reading the spark docs about spark structured streaming, since we
>>> are thinking about updating our code base that today uses Dstreams, hence
>>> spark streaming. Also, one main reason for this change that we want to
>>> realize is that reading headers in kafka messages is only supported in
>>> spark structured streaming and not in Dstreams.
>>>
>>> I was surprised to not see an obvious way to handle manually the offsets
>>> by committing the offsets to kafka. In spark streaming we used to do it
>>> with something similar to these lines of code:
>>>
>>> stream.foreachRDD { rdd =>
>>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>
>>>   // some time later, after outputs have completed
>>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>>
>>>
>>> And this works perfectly ! Especially, this works very nice in case of
>>> job failure/restart... I am wondering how this can be achieved in spark
>>> structured streaming ?
>>>
>>> I read about checkpoints, and this reminds me the old way of doing
>>> things in spark 1.5/kafka0.8 and is not perfect since we are not deciding
>>> when to commit offsets by ourselves.
>>>
>>> Did I miss anything ? What would be the best way of committing offsets
>>> to kafka with spark structured streaming to the concerned consumer group ?
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Jacek Laskowski
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets
are infrastructure-related and should not really be in the hands of Spark
devs who should instead focus on the business purpose of the code (not
offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
wrote:

> There is no way to store offsets in Kafka and restart from the stored
> offset. Structured Streaming stores offset in checkpoint and it restart
> from there without any user code.
>
> Offsets can be stored with a listener but it can be only used for lag
> calculation.
>
> BR,
> G
>
>
> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>
>> Hello,
>>
>> I was reading the spark docs about spark structured streaming, since we
>> are thinking about updating our code base that today uses Dstreams, hence
>> spark streaming. Also, one main reason for this change that we want to
>> realize is that reading headers in kafka messages is only supported in
>> spark structured streaming and not in Dstreams.
>>
>> I was surprised to not see an obvious way to handle manually the offsets
>> by committing the offsets to kafka. In spark streaming we used to do it
>> with something similar to these lines of code:
>>
>> stream.foreachRDD { rdd =>
>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>   // some time later, after outputs have completed
>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>
>>
>> And this works perfectly ! Especially, this works very nice in case of
>> job failure/restart... I am wondering how this can be achieved in spark
>> structured streaming ?
>>
>> I read about checkpoints, and this reminds me the old way of doing things
>> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to
>> commit offsets by ourselves.
>>
>> Did I miss anything ? What would be the best way of committing offsets to
>> kafka with spark structured streaming to the concerned consumer group ?
>>
>> Best regards,
>> Ali Gouta.
>>
>


Re: Writing to Google Cloud Storage with v2 algorithm safe?

2021-04-04 Thread Jacek Laskowski
Hi Vaquar,

Thanks a lot! Accepted as the answer (yet there was the other answer that
was very helpful too). Tons of reading ahead to understand it more.

That once again makes me feel that Hadoop MapReduce experience would help a
great deal (and I've got none).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Sun, Apr 4, 2021 at 7:28 AM vaquar khan  wrote:

> Hi Jecek ,
>
> I have answered , hope you find it useful.
>
> Regards,
> Viquar khan
>
> On Sat, Apr 3, 2021 at 11:19 AM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> I've just posted a question on StackOverflow [1] about the safety of the
>> v2 algorithm while writing out to Google Cloud Storage. I think I'm missing
>> some fundamentals on how cloud object stores work (GCS in particular) and
>> hence the question.
>>
>> Is this all about File.rename and how many HTTP calls are there under the
>> covers? How to know it for GCS?
>>
>> Thank you for any help you can provide. Merci beaucoup mes amis :)
>>
>> [1] https://stackoverflow.com/q/66933229/1305344
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>
>
> --
> Regards,
> Vaquar Khan
> +1 -224-436-0783
> Greater Chicago
>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Gabor Somogyi
There is no way to store offsets in Kafka and restart from the stored
offset. Structured Streaming stores offset in checkpoint and it restart
from there without any user code.

Offsets can be stored with a listener but it can be only used for lag
calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:

> Hello,
>
> I was reading the spark docs about spark structured streaming, since we
> are thinking about updating our code base that today uses Dstreams, hence
> spark streaming. Also, one main reason for this change that we want to
> realize is that reading headers in kafka messages is only supported in
> spark structured streaming and not in Dstreams.
>
> I was surprised to not see an obvious way to handle manually the offsets
> by committing the offsets to kafka. In spark streaming we used to do it
> with something similar to these lines of code:
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> And this works perfectly ! Especially, this works very nice in case of job
> failure/restart... I am wondering how this can be achieved in spark
> structured streaming ?
>
> I read about checkpoints, and this reminds me the old way of doing things
> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to
> commit offsets by ourselves.
>
> Did I miss anything ? What would be the best way of committing offsets to
> kafka with spark structured streaming to the concerned consumer group ?
>
> Best regards,
> Ali Gouta.
>