Right, I understand why the exceptions happen.
However, it seems less useful to have a checkpointing that only works in
the case of an application restart. IMO, code changes happen quite often,
and not being able to pick up where the previous job left off is quite a
bit of a hinderance.

The solutions you mention would partially solve the problem, while bringing
new problems along ( increased resource utilization, difficulty in managing
multiple jobs consuming the same data ,etc ).

The solution that we currently employ is committing the offsets to a
durable storage and making sure that the job reads the offsets from there
upon restart, while forsaking checkpointing.

The scenario seems not to be an edge case, which is why I was asking that
perhaps it could be handled by the spark kafka API instead having everyone
come up with their own, sub-optimal solutions.

Radu

On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase <atan...@adobe.com> wrote:

> Hi Radu,
>
> The problem itself is not checkpointing the data – if your operations are
> stateless then you are only checkpointing the kafka offsets, you are right.
> The problem is that you are also checkpointing metadata – including the
> actual Code and serialized java classes – that’s why you’ll see ser/deser
> exceptions on restart with upgrade.
>
> If you’re not using stateful opetations, you might get away by using the
> old Kafka receiver w/o WAL – but you accept “at least once semantics”. As
> soon as you add in the WAL you are forced to checkpoint and you’re better
> off with the DirectReceiver approach.
>
> I believe the simplest way to get around is to support runnning 2 versions
> in parallel – with some app level control of a barrier (e.g. v1 reads
> events up to 3:00am, v2 after that). Manual state management is also
> supported by the framework but it’s harder to control because:
>
>    - you’re not guaranteed to shut down gracefully
>    - You may have a bug that prevents the state to be saved and you can’t
>    restart the app w/o upgrade
>
> Less than ideal, yes :)
>
> -adrian
>
> From: Radu Brumariu
> Date: Friday, September 25, 2015 at 1:31 AM
> To: Cody Koeninger
> Cc: "user@spark.apache.org"
> Subject: Re: kafka direct streaming with checkpointing
>
> Would changing the direct stream api to support committing the offsets to
> kafka's ZK( like a regular consumer) as a fallback mechanism, in case
> recovering from checkpoint fails , be an accepted solution?
>
> On Thursday, September 24, 2015, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> This has been discussed numerous times, TD's response has consistently
>> been that it's unlikely to be possible
>>
>> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu <bru...@gmail.com> wrote:
>>
>>> It seems to me that this scenario that I'm facing, is quite common for
>>> spark jobs using Kafka.
>>> Is there a ticket to add this sort of semantics to checkpointing ? Does
>>> it even make sense to add it there ?
>>>
>>> Thanks,
>>> Radu
>>>
>>>
>>> On Thursday, September 24, 2015, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> No, you cant use checkpointing across code changes.  Either store
>>>> offsets yourself, or start up your new app code and let it catch up before
>>>> killing the old one.
>>>>
>>>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu <bru...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> in my application I use Kafka direct streaming and I have also enabled
>>>>> checkpointing.
>>>>> This seems to work fine if the application is restarted. However if I
>>>>> change the code and resubmit the application, it cannot start because of
>>>>> the checkpointed data being of different class versions.
>>>>> Is there any way I can use checkpointing that can survive across
>>>>> application version changes?
>>>>>
>>>>> Thanks,
>>>>> Radu
>>>>>
>>>>>
>>>>
>>

Reply via email to