Wouldn't the same case be made for checkpointing in general ? What I am trying to say, is that this particular situation is part of the general checkpointing use case, not an edge case. I would like to understand why shouldn't the checkpointing mechanism, already existent in Spark, handle this situation too ?
On Fri, Sep 25, 2015 at 12:20 PM, Cody Koeninger <c...@koeninger.org> wrote: > Storing passbacks transactionally with results in your own data store, > with a schema that makes sense for you, is the optimal solution. > > On Fri, Sep 25, 2015 at 11:05 AM, Radu Brumariu <bru...@gmail.com> wrote: > >> Right, I understand why the exceptions happen. >> However, it seems less useful to have a checkpointing that only works in >> the case of an application restart. IMO, code changes happen quite often, >> and not being able to pick up where the previous job left off is quite a >> bit of a hinderance. >> >> The solutions you mention would partially solve the problem, while >> bringing new problems along ( increased resource utilization, difficulty in >> managing multiple jobs consuming the same data ,etc ). >> >> The solution that we currently employ is committing the offsets to a >> durable storage and making sure that the job reads the offsets from there >> upon restart, while forsaking checkpointing. >> >> The scenario seems not to be an edge case, which is why I was asking that >> perhaps it could be handled by the spark kafka API instead having everyone >> come up with their own, sub-optimal solutions. >> >> Radu >> >> On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> Hi Radu, >>> >>> The problem itself is not checkpointing the data – if your operations >>> are stateless then you are only checkpointing the kafka offsets, you are >>> right. >>> The problem is that you are also checkpointing metadata – including the >>> actual Code and serialized java classes – that’s why you’ll see >>> ser/deser exceptions on restart with upgrade. >>> >>> If you’re not using stateful opetations, you might get away by using the >>> old Kafka receiver w/o WAL – but you accept “at least once semantics”. As >>> soon as you add in the WAL you are forced to checkpoint and you’re better >>> off with the DirectReceiver approach. >>> >>> I believe the simplest way to get around is to support runnning 2 >>> versions in parallel – with some app level control of a barrier (e.g. v1 >>> reads events up to 3:00am, v2 after that). Manual state management is also >>> supported by the framework but it’s harder to control because: >>> >>> - you’re not guaranteed to shut down gracefully >>> - You may have a bug that prevents the state to be saved and you >>> can’t restart the app w/o upgrade >>> >>> Less than ideal, yes :) >>> >>> -adrian >>> >>> From: Radu Brumariu >>> Date: Friday, September 25, 2015 at 1:31 AM >>> To: Cody Koeninger >>> Cc: "user@spark.apache.org" >>> Subject: Re: kafka direct streaming with checkpointing >>> >>> Would changing the direct stream api to support committing the offsets >>> to kafka's ZK( like a regular consumer) as a fallback mechanism, in case >>> recovering from checkpoint fails , be an accepted solution? >>> >>> On Thursday, September 24, 2015, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> This has been discussed numerous times, TD's response has consistently >>>> been that it's unlikely to be possible >>>> >>>> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu <bru...@gmail.com> >>>> wrote: >>>> >>>>> It seems to me that this scenario that I'm facing, is quite common for >>>>> spark jobs using Kafka. >>>>> Is there a ticket to add this sort of semantics to checkpointing ? >>>>> Does it even make sense to add it there ? >>>>> >>>>> Thanks, >>>>> Radu >>>>> >>>>> >>>>> On Thursday, September 24, 2015, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> No, you cant use checkpointing across code changes. Either store >>>>>> offsets yourself, or start up your new app code and let it catch up >>>>>> before >>>>>> killing the old one. >>>>>> >>>>>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu <bru...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> in my application I use Kafka direct streaming and I have also >>>>>>> enabled checkpointing. >>>>>>> This seems to work fine if the application is restarted. However if >>>>>>> I change the code and resubmit the application, it cannot start because >>>>>>> of >>>>>>> the checkpointed data being of different class versions. >>>>>>> Is there any way I can use checkpointing that can survive across >>>>>>> application version changes? >>>>>>> >>>>>>> Thanks, >>>>>>> Radu >>>>>>> >>>>>>> >>>>>> >>>> >> >