Hey Cody,

I ended up choosing a different way to do things, which is using Kafka to 
commit my offsets.
It works fine, except it stores the offset in ZK instead of a Kafka topic 
(investigating this right now).

I understand your explanations, thank you, but I have one question: when you say 
"Repartition almost always involves a shuffle", are you talking about 
repartition using partitionBy()?

On 10/11/2016 01:23 AM, Cody Koeninger wrote:
Repartition almost always involves a shuffle.

Let me see if I can explain the recovery stuff...

Say you start with two kafka partitions, topic-0 and topic-1.

You shuffle those across 3 spark parittions, we'll label them A B and C.

Your job is has written

fileA: results for A, offset ranges for topic-0 offsets 10-60 and
topic-1 offsets 23 - 66
fileB: results for B, offset ranges for topic-0 offsets 10-60 and
topic-1 offsets 23 - 66

It's in the middle of writing for fileC when it dies.

On startup, you need to start the job from
topic-0 offsets 10-60 and topic-1 offsets 23 - 66
and do all the work in all the partitions (because of the shuffle, you
dont know which kafka partition the data came from)

You just don't overwrite fileA and fileB, because they already have
correct data and offsets.  You just write fileC.

Then once youve recovered you go on about your job as normal, starting
at topic-0 offsets 60, topic-1 offsets 66

Clear as mud?

On Mon, Oct 10, 2016 at 5:36 PM, Samy Dindane <s...@dindane.com> wrote:

On 10/10/2016 8:14 PM, Cody Koeninger wrote:

Glad it was helpful :)

As far as executors, my expectation is that if you have multiple
executors running, and one of them crashes, the failed task will be
submitted on a different executor.  That is typically what I observe
in spark apps, if that's not what you're seeing I'd try to get help on
that specific issue.

As far as kafka offsets per-partition, you need to atomically write
your offsets and results in the same place.If that place is a
filesystem, you need to be using atomic operations (I try to stay away
from HDFS, but I believe renames are atomic, for instance).

That's what I am trying to do.
If you're

doing that in normal code, ie with an iterator instead of an rdd or
dataframe, you may have to do some of that work yourself.

How would you do that without an iterator?
As far as I've seen, to have granular control over partitions (and each
partition's checkpoint), one needs to use `foreachPartition` or
`mapPartitions`, thus dealing with an iterator instead of a RDD.

As far as using partitionBy, even after repartitioning you can still
use the general idea of writing results and offsets in the same place.

I don't understand what you mean. What do you mean by "the same place"? Why
would I want to do that?

The major difference is that you need to write all offsets in each
partition (because things have been shuffled),

Oh I didn't think of shuffling after partitioning.
But does it matter if I use partitionBy()?

I will make a code example once at the office so you can take a look.

and need to be more
careful on startup after a failure.  On startup, you'd see which
partitions were incomplete, start the job from the offsets in the
incomplete partitions, do the work for all partitions,

It's clear until here.

but ignore the
writes when they got to the complete partitions.  I realize that's
kind of a complicated description, if it doesn't make sense ask, or I
may be able to put up some publicly visible code at some point in the

Yes I don't see what you mean. :)

I really appreciate your help. Thanks a lot.

On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane <s...@dindane.com> wrote:

I just noticed that you're the author of the code I linked in my previous
email. :) It's helpful.

When using `foreachPartition` or `mapPartitions`, I noticed I can't ask
Spark to write the data on the disk using `df.write()` but I need to use
iterator to do so, which means losing the ability of using partitionBy().
Do you know a workaround? Or I'll be forced to partition data manually.

I think I understand why the job crashes when a single executor does:
`df.write()....save()` writes all the partitions in the same time, which
fails if one of them has died.
Is that right?

Thank you.


On 10/10/2016 04:58 PM, Samy Dindane wrote:

Hi Cody,

I am writing a spark job that reads records from a Kafka topic and
them on the file system.
This would be straightforward if it weren't for the custom checkpointing
logic I want to have; Spark's checkpointing doesn't suit us as it
permit code updates.

The custom checkpointing would be similar to this:

I am trying to understand how this would work if an executor crashes, so
tried making one crash manually, but I noticed it kills the whole job
instead of creating another executor to resume the task.
Is that expected? Is there anything wrong with my approach?

Thank you for your time.

On 10/10/2016 04:29 PM, Cody Koeninger wrote:

What is it you're actually trying to accomplish?

On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane <s...@dindane.com> wrote:

I managed to make a specific executor crash by using
TaskContext.get.partitionId and throwing an exception for a specific

The issue I have now is that the whole job stops when a single
Do I need to explicitly tell Spark to start a new executor and keep
other ones running?

On 10/10/2016 11:19 AM, Samy Dindane wrote:


I am writing a streaming job that reads a Kafka topic.
As far as I understand, Spark does a 1:1 mapping between its
Kafka partitions.

In order to correctly implement my checkpoint logic, I'd like to know
exactly happens when an executors crashes.
Also, is it possible to kill an executor manually for testing

Thank you.


To unsubscribe e-mail: user-unsubscr...@spark.apache.org

To unsubscribe e-mail: user-unsubscr...@spark.apache.org

To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to