Thanks Petr, Cody. This is a reasonable place to start for me. What I'm trying to achieve
stream.foreachRDD {rdd=> rdd.foreachPartition { p=> Try(myFunc(...)) match { case Sucess(s) => updatewatermark for this partition //of course, expectation is that it will work only if there is a 1-1 mapping at this point in time case Failure() => Tell the driver not to generate a partition for this kafka topic+partition for a while, by updating some shared state (zk) } } } I was looking for that mapping b/w kafka partition thats bound to a task inside the task execution code, in cases where the intermediate operations do not change partitions, shuffle etc. -neelesh On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger <c...@koeninger.org> wrote: > > http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers > > also has an example of how to close over the offset ranges so they are > available on executors. > > On Fri, Sep 25, 2015 at 12:50 PM, Neelesh <neele...@gmail.com> wrote: > >> Hi, >> We are using DirectKafkaInputDStream and store completed consumer >> offsets in Kafka (0.8.2). However, some of our use case require that >> offsets be not written if processing of a partition fails with certain >> exceptions. This allows us to build various backoff strategies for that >> partition, instead of either blindly committing consumer offsets regardless >> of errors (because KafkaRDD as HasOffsetRanges is available only on the >> driver) or relying on Spark's retry logic and continuing without remedial >> action. >> >> I was playing with SparkListener and found that while one can listen on >> taskCompletedEvent on the driver and even figure out that there was an >> error, there is no way of mapping this task back to the partition and >> retrieving offset range, topic & kafka partition # etc. >> >> Any pointers appreciated! >> >> Thanks! >> -neelesh >> > >