Repartition almost always involves a shuffle.

Let me see if I can explain the recovery stuff...

Say you start with two kafka partitions, topic-0 and topic-1.

You shuffle those across 3 spark parittions, we'll label them A B and C.

Your job is has written

fileA: results for A, offset ranges for topic-0 offsets 10-60 and
topic-1 offsets 23 - 66
fileB: results for B, offset ranges for topic-0 offsets 10-60 and
topic-1 offsets 23 - 66

It's in the middle of writing for fileC when it dies.

On startup, you need to start the job from
topic-0 offsets 10-60 and topic-1 offsets 23 - 66
and do all the work in all the partitions (because of the shuffle, you
dont know which kafka partition the data came from)

You just don't overwrite fileA and fileB, because they already have
correct data and offsets.  You just write fileC.

Then once youve recovered you go on about your job as normal, starting
at topic-0 offsets 60, topic-1 offsets 66

Clear as mud?






On Mon, Oct 10, 2016 at 5:36 PM, Samy Dindane <s...@dindane.com> wrote:
>
>
> On 10/10/2016 8:14 PM, Cody Koeninger wrote:
>>
>> Glad it was helpful :)
>>
>> As far as executors, my expectation is that if you have multiple
>> executors running, and one of them crashes, the failed task will be
>> submitted on a different executor.  That is typically what I observe
>> in spark apps, if that's not what you're seeing I'd try to get help on
>> that specific issue.
>>
>> As far as kafka offsets per-partition, you need to atomically write
>> your offsets and results in the same place.If that place is a
>> filesystem, you need to be using atomic operations (I try to stay away
>> from HDFS, but I believe renames are atomic, for instance).
>
> That's what I am trying to do.
>> If you're
>>
>> doing that in normal code, ie with an iterator instead of an rdd or
>> dataframe, you may have to do some of that work yourself.
>
> How would you do that without an iterator?
> As far as I've seen, to have granular control over partitions (and each
> partition's checkpoint), one needs to use `foreachPartition` or
> `mapPartitions`, thus dealing with an iterator instead of a RDD.
>>
>>
>> As far as using partitionBy, even after repartitioning you can still
>> use the general idea of writing results and offsets in the same place.
>
> I don't understand what you mean. What do you mean by "the same place"? Why
> would I want to do that?
>>
>> The major difference is that you need to write all offsets in each
>> partition (because things have been shuffled),
>
> Oh I didn't think of shuffling after partitioning.
> But does it matter if I use partitionBy()?
>
> I will make a code example once at the office so you can take a look.
>>
>> and need to be more
>> careful on startup after a failure.  On startup, you'd see which
>> partitions were incomplete, start the job from the offsets in the
>> incomplete partitions, do the work for all partitions,
>
> It's clear until here.
>>
>> but ignore the
>> writes when they got to the complete partitions.  I realize that's
>> kind of a complicated description, if it doesn't make sense ask, or I
>> may be able to put up some publicly visible code at some point in the
>> future.
>
> Yes I don't see what you mean. :)
>
> I really appreciate your help. Thanks a lot.
>
>>
>>
>> On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane <s...@dindane.com> wrote:
>>>
>>> I just noticed that you're the author of the code I linked in my previous
>>> email. :) It's helpful.
>>>
>>> When using `foreachPartition` or `mapPartitions`, I noticed I can't ask
>>> Spark to write the data on the disk using `df.write()` but I need to use
>>> the
>>> iterator to do so, which means losing the ability of using partitionBy().
>>> Do you know a workaround? Or I'll be forced to partition data manually.
>>>
>>> I think I understand why the job crashes when a single executor does:
>>> `df.write()....save()` writes all the partitions in the same time, which
>>> fails if one of them has died.
>>> Is that right?
>>>
>>> Thank you.
>>>
>>> Samy
>>>
>>>
>>> On 10/10/2016 04:58 PM, Samy Dindane wrote:
>>>>
>>>>
>>>> Hi Cody,
>>>>
>>>> I am writing a spark job that reads records from a Kafka topic and
>>>> writes
>>>> them on the file system.
>>>> This would be straightforward if it weren't for the custom checkpointing
>>>> logic I want to have; Spark's checkpointing doesn't suit us as it
>>>> doesn't
>>>> permit code updates.
>>>>
>>>> The custom checkpointing would be similar to this:
>>>>
>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala
>>>> I am trying to understand how this would work if an executor crashes, so
>>>> I
>>>> tried making one crash manually, but I noticed it kills the whole job
>>>> instead of creating another executor to resume the task.
>>>> Is that expected? Is there anything wrong with my approach?
>>>>
>>>> Thank you for your time.
>>>>
>>>>
>>>> On 10/10/2016 04:29 PM, Cody Koeninger wrote:
>>>>>
>>>>>
>>>>> What is it you're actually trying to accomplish?
>>>>>
>>>>> On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane <s...@dindane.com> wrote:
>>>>>>
>>>>>>
>>>>>> I managed to make a specific executor crash by using
>>>>>> TaskContext.get.partitionId and throwing an exception for a specific
>>>>>> executor.
>>>>>>
>>>>>> The issue I have now is that the whole job stops when a single
>>>>>> executor
>>>>>> crashes.
>>>>>> Do I need to explicitly tell Spark to start a new executor and keep
>>>>>> the
>>>>>> other ones running?
>>>>>>
>>>>>>
>>>>>> On 10/10/2016 11:19 AM, Samy Dindane wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am writing a streaming job that reads a Kafka topic.
>>>>>>> As far as I understand, Spark does a 1:1 mapping between its
>>>>>>> executors
>>>>>>> and
>>>>>>> Kafka partitions.
>>>>>>>
>>>>>>> In order to correctly implement my checkpoint logic, I'd like to know
>>>>>>> what
>>>>>>> exactly happens when an executors crashes.
>>>>>>> Also, is it possible to kill an executor manually for testing
>>>>>>> purposes?
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>> Samy
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to