0.  If your processing time is regularly greater than your batch
interval you're going to have problems anyway.  Investigate this more,
set maxRatePerPartition, something.
1. That's personally what I tend to do.
2. Why are you relying on checkpoints if you're storing offset state
in the database?  Just restart from the offsets in the database.  I
think your solution of map of batchtime to offset ranges would work
fine in that case, no?  (make sure to expire items from the map)



On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com> wrote:
> Hi,
>
> I'm currently implementing an exactly once mechanism based on the following
> example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
>
> the pseudo code is as follow:
>
> dstream.transform (store offset in a variable on driver side )
> dstream.map
> dstream.foreachRdd( action + save offset in db)
>
> this code doesn't work if the processing time is greater than batch interval
> (same problem as windowed
> (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala)
>
> Indeed, at each batch interval a new rdd is created and stacked, thus method
> transform is called several times and update the global variable and at last
> when we perform saving the offset range does not correspond to the one
> processed.
>
> 1) Do I need to work at the RDD level (inside a big forEachRDD like in the
> first example) instead of dstream ?
>
> 2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
> of crash this map will not reflect anymore the generatedRdds (restored from
> checkpoint, RDD prepared but not executed)
>   2.1 ) Do I need to store this map elsewhere (cassandra) ?
>   2.2)  Is there a way to retrieve offset range restored ? (transform method
> is not called anymore for the checkpointed rdd)
>   2.3) Is possible to store some context along the RDD to be serialized ?
>
> Lots of questions, let me kow if it's not clear !
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to