Thanks for the fast answer !

I just feel annoyed and frustrated not to be able to use spark
checkpointing because I believe that there mechanism has been correctly
tested.
I'm afraid that reinventing the wheel can lead to side effects that I don't
see now ...

Anyway thanks again, I know what I have to do :)

On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger <c...@koeninger.org> wrote:

> 0.  If your processing time is regularly greater than your batch
> interval you're going to have problems anyway.  Investigate this more,
> set maxRatePerPartition, something.
> 1. That's personally what I tend to do.
> 2. Why are you relying on checkpoints if you're storing offset state
> in the database?  Just restart from the offsets in the database.  I
> think your solution of map of batchtime to offset ranges would work
> fine in that case, no?  (make sure to expire items from the map)
>
>
>
> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com>
> wrote:
> > Hi,
> >
> > I'm currently implementing an exactly once mechanism based on the
> following
> > example:
> >
> > https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/TransactionalPerBatch.scala
> >
> > the pseudo code is as follow:
> >
> > dstream.transform (store offset in a variable on driver side )
> > dstream.map
> > dstream.foreachRdd( action + save offset in db)
> >
> > this code doesn't work if the processing time is greater than batch
> interval
> > (same problem as windowed
> > (https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/Windowed.scala)
> >
> > Indeed, at each batch interval a new rdd is created and stacked, thus
> method
> > transform is called several times and update the global variable and at
> last
> > when we perform saving the offset range does not correspond to the one
> > processed.
> >
> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in
> the
> > first example) instead of dstream ?
> >
> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in
> case
> > of crash this map will not reflect anymore the generatedRdds (restored
> from
> > checkpoint, RDD prepared but not executed)
> >   2.1 ) Do I need to store this map elsewhere (cassandra) ?
> >   2.2)  Is there a way to retrieve offset range restored ? (transform
> method
> > is not called anymore for the checkpointed rdd)
> >   2.3) Is possible to store some context along the RDD to be serialized ?
> >
> > Lots of questions, let me kow if it's not clear !
> >
>

Reply via email to