Thanks for the fast answer ! I just feel annoyed and frustrated not to be able to use spark checkpointing because I believe that there mechanism has been correctly tested. I'm afraid that reinventing the wheel can lead to side effects that I don't see now ...
Anyway thanks again, I know what I have to do :) On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger <c...@koeninger.org> wrote: > 0. If your processing time is regularly greater than your batch > interval you're going to have problems anyway. Investigate this more, > set maxRatePerPartition, something. > 1. That's personally what I tend to do. > 2. Why are you relying on checkpoints if you're storing offset state > in the database? Just restart from the offsets in the database. I > think your solution of map of batchtime to offset ranges would work > fine in that case, no? (make sure to expire items from the map) > > > > On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com> > wrote: > > Hi, > > > > I'm currently implementing an exactly once mechanism based on the > following > > example: > > > > https://github.com/koeninger/kafka-exactly-once/blob/ > master/src/main/scala/example/TransactionalPerBatch.scala > > > > the pseudo code is as follow: > > > > dstream.transform (store offset in a variable on driver side ) > > dstream.map > > dstream.foreachRdd( action + save offset in db) > > > > this code doesn't work if the processing time is greater than batch > interval > > (same problem as windowed > > (https://github.com/koeninger/kafka-exactly-once/blob/ > master/src/main/scala/example/Windowed.scala) > > > > Indeed, at each batch interval a new rdd is created and stacked, thus > method > > transform is called several times and update the global variable and at > last > > when we perform saving the offset range does not correspond to the one > > processed. > > > > 1) Do I need to work at the RDD level (inside a big forEachRDD like in > the > > first example) instead of dstream ? > > > > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in > case > > of crash this map will not reflect anymore the generatedRdds (restored > from > > checkpoint, RDD prepared but not executed) > > 2.1 ) Do I need to store this map elsewhere (cassandra) ? > > 2.2) Is there a way to retrieve offset range restored ? (transform > method > > is not called anymore for the checkpointed rdd) > > 2.3) Is possible to store some context along the RDD to be serialized ? > > > > Lots of questions, let me kow if it's not clear ! > > >