Your success case will work fine, it is a 1-1 mapping as you said.

To handle failures in exactly the way you describe, you'd need to subclass
or modify DirectKafkaInputDStream and change the way compute() works.

Unless you really are going to have very fine-grained failures (why would
only a given partition be failing while the rest are fine?) it's going to
be easier to just fail the whole task and retry, or eventually kill the job.

On Fri, Sep 25, 2015 at 1:55 PM, Neelesh <neele...@gmail.com> wrote:

> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
> trying to achieve
>
> stream.foreachRDD {rdd=>
>    rdd.foreachPartition { p=>
>
>        Try(myFunc(...))  match {
>          case Sucess(s) => updatewatermark for this partition //of course,
> expectation is that it will work only if there is a 1-1 mapping at this
> point in time
>          case Failure()  => Tell the driver not to generate a partition
> for this kafka topic+partition for a while, by updating some shared state
> (zk)
>
>        }
>
>  }
> }
>
> I was looking for that mapping b/w kafka partition thats bound to a task
> inside the task execution code, in cases where the intermediate operations
> do not change partitions, shuffle etc.
>
> -neelesh
>
> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> also has an example of how to close over the offset ranges so they are
>> available on executors.
>>
>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh <neele...@gmail.com> wrote:
>>
>>> Hi,
>>>    We are using DirectKafkaInputDStream and store completed consumer
>>> offsets in Kafka (0.8.2). However, some of our use case require that
>>> offsets be not written if processing of a partition fails with certain
>>> exceptions. This allows us to build various backoff strategies for that
>>> partition, instead of either blindly committing consumer offsets regardless
>>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>>> driver)  or relying on Spark's retry logic and continuing without remedial
>>> action.
>>>
>>> I was playing with SparkListener and found that while one can listen on
>>> taskCompletedEvent on the driver and even figure out that there was an
>>> error, there is no way of mapping this task back to the partition and
>>> retrieving offset range, topic & kafka partition # etc.
>>>
>>> Any pointers appreciated!
>>>
>>> Thanks!
>>> -neelesh
>>>
>>
>>
>

Reply via email to