Re: Spark structured streaming - Fallback to earliest offset

2020-04-19 Thread Jungtaek Lim
You may want to check "where" the job is stuck via taking thread dump - it
could be in kafka consumer, in Spark codebase, etc. Without the information
it's hard to say.

On Thu, Apr 16, 2020 at 4:22 PM Ruijing Li  wrote:

> Thanks Jungtaek, that makes sense.
>
> I tried Burak’s solution of just turning failOnDataLoss to be false, but
> instead of failing, the job is stuck. I’m guessing that the offsets are
> being deleted faster than the job can process them and it will be stuck
> unless I increase resources? Or does once the exception happen, spark will
> hang?
>
> On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I think Spark is trying to ensure that it reads the input "continuously"
>> without any missing. Technically it may be valid to say the situation is a
>> kind of "data-loss", as the query couldn't process the offsets which are
>> being thrown out, and owner of the query needs to be careful as it affects
>> the result.
>>
>> If your streaming query keeps up with input rate then it's pretty rare
>> for the query to go under retention. Even it lags a bit, it'd be safe if
>> retention is set to enough period. The ideal state would be ensuring your
>> query to process all offsets before they are thrown out by retention (don't
>> leave the query lagging behind - either increasing processing power or
>> increasing retention duration, though most probably you'll need to do
>> former), but if you can't make sure and if you understand the risk then yes
>> you can turn off the option and take the risk.
>>
>>
>> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>>
>>> I see, I wasn’t sure if that would work as expected. The docs seems to
>>> suggest to be careful before turning off that option, and I’m not sure why
>>> failOnDataLoss is true by default.
>>>
>>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>>
 Just set `failOnDataLoss=false` as an option in readStream?

 On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
 wrote:

> Hi all,
>
> I have a spark structured streaming app that is consuming from a kafka
> topic with retention set up. Sometimes I face an issue where my query has
> not finished processing a message but the retention kicks in and deletes
> the offset, which since I use the default setting of “failOnDataLoss=true”
> causes my query to fail. The solution I currently have is manual, deleting
> the offsets directory and rerunning.
>
> I instead like to have spark automatically fall back to the earliest
> offset available. The solutions I saw recommend setting auto.offset =
> earliest, but for structured streaming, you cannot set that. How do I do
> this for structured streaming?
>
> Thanks!
> --
> Cheers,
> Ruijing Li
>
 --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-16 Thread Ruijing Li
Thanks Jungtaek, that makes sense.

I tried Burak’s solution of just turning failOnDataLoss to be false, but
instead of failing, the job is stuck. I’m guessing that the offsets are
being deleted faster than the job can process them and it will be stuck
unless I increase resources? Or does once the exception happen, spark will
hang?

On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim 
wrote:

> I think Spark is trying to ensure that it reads the input "continuously"
> without any missing. Technically it may be valid to say the situation is a
> kind of "data-loss", as the query couldn't process the offsets which are
> being thrown out, and owner of the query needs to be careful as it affects
> the result.
>
> If your streaming query keeps up with input rate then it's pretty rare for
> the query to go under retention. Even it lags a bit, it'd be safe if
> retention is set to enough period. The ideal state would be ensuring your
> query to process all offsets before they are thrown out by retention (don't
> leave the query lagging behind - either increasing processing power or
> increasing retention duration, though most probably you'll need to do
> former), but if you can't make sure and if you understand the risk then yes
> you can turn off the option and take the risk.
>
>
> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>
>> I see, I wasn’t sure if that would work as expected. The docs seems to
>> suggest to be careful before turning off that option, and I’m not sure why
>> failOnDataLoss is true by default.
>>
>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>
>>> Just set `failOnDataLoss=false` as an option in readStream?
>>>
>>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
>>> wrote:
>>>
 Hi all,

 I have a spark structured streaming app that is consuming from a kafka
 topic with retention set up. Sometimes I face an issue where my query has
 not finished processing a message but the retention kicks in and deletes
 the offset, which since I use the default setting of “failOnDataLoss=true”
 causes my query to fail. The solution I currently have is manual, deleting
 the offsets directory and rerunning.

 I instead like to have spark automatically fall back to the earliest
 offset available. The solutions I saw recommend setting auto.offset =
 earliest, but for structured streaming, you cannot set that. How do I do
 this for structured streaming?

 Thanks!
 --
 Cheers,
 Ruijing Li

>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Jungtaek Lim
I think Spark is trying to ensure that it reads the input "continuously"
without any missing. Technically it may be valid to say the situation is a
kind of "data-loss", as the query couldn't process the offsets which are
being thrown out, and owner of the query needs to be careful as it affects
the result.

If your streaming query keeps up with input rate then it's pretty rare for
the query to go under retention. Even it lags a bit, it'd be safe if
retention is set to enough period. The ideal state would be ensuring your
query to process all offsets before they are thrown out by retention (don't
leave the query lagging behind - either increasing processing power or
increasing retention duration, though most probably you'll need to do
former), but if you can't make sure and if you understand the risk then yes
you can turn off the option and take the risk.


On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:

> I see, I wasn’t sure if that would work as expected. The docs seems to
> suggest to be careful before turning off that option, and I’m not sure why
> failOnDataLoss is true by default.
>
> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>
>> Just set `failOnDataLoss=false` as an option in readStream?
>>
>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> I have a spark structured streaming app that is consuming from a kafka
>>> topic with retention set up. Sometimes I face an issue where my query has
>>> not finished processing a message but the retention kicks in and deletes
>>> the offset, which since I use the default setting of “failOnDataLoss=true”
>>> causes my query to fail. The solution I currently have is manual, deleting
>>> the offsets directory and rerunning.
>>>
>>> I instead like to have spark automatically fall back to the earliest
>>> offset available. The solutions I saw recommend setting auto.offset =
>>> earliest, but for structured streaming, you cannot set that. How do I do
>>> this for structured streaming?
>>>
>>> Thanks!
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Ruijing Li
I see, I wasn’t sure if that would work as expected. The docs seems to
suggest to be careful before turning off that option, and I’m not sure why
failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:

> Just set `failOnDataLoss=false` as an option in readStream?
>
> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:
>
>> Hi all,
>>
>> I have a spark structured streaming app that is consuming from a kafka
>> topic with retention set up. Sometimes I face an issue where my query has
>> not finished processing a message but the retention kicks in and deletes
>> the offset, which since I use the default setting of “failOnDataLoss=true”
>> causes my query to fail. The solution I currently have is manual, deleting
>> the offsets directory and rerunning.
>>
>> I instead like to have spark automatically fall back to the earliest
>> offset available. The solutions I saw recommend setting auto.offset =
>> earliest, but for structured streaming, you cannot set that. How do I do
>> this for structured streaming?
>>
>> Thanks!
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Burak Yavuz
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:

> Hi all,
>
> I have a spark structured streaming app that is consuming from a kafka
> topic with retention set up. Sometimes I face an issue where my query has
> not finished processing a message but the retention kicks in and deletes
> the offset, which since I use the default setting of “failOnDataLoss=true”
> causes my query to fail. The solution I currently have is manual, deleting
> the offsets directory and rerunning.
>
> I instead like to have spark automatically fall back to the earliest
> offset available. The solutions I saw recommend setting auto.offset =
> earliest, but for structured streaming, you cannot set that. How do I do
> this for structured streaming?
>
> Thanks!
> --
> Cheers,
> Ruijing Li
>


Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Ruijing Li
Hi all,

I have a spark structured streaming app that is consuming from a kafka
topic with retention set up. Sometimes I face an issue where my query has
not finished processing a message but the retention kicks in and deletes
the offset, which since I use the default setting of “failOnDataLoss=true”
causes my query to fail. The solution I currently have is manual, deleting
the offsets directory and rerunning.

I instead like to have spark automatically fall back to the earliest offset
available. The solutions I saw recommend setting auto.offset = earliest,
but for structured streaming, you cannot set that. How do I do this for
structured streaming?

Thanks!
-- 
Cheers,
Ruijing Li