Yes, partitionBy will shuffle unless it happens to be partitioning with the exact same partitioner the parent rdd had.
On Wed, Oct 12, 2016 at 8:34 AM, Samy Dindane <s...@dindane.com> wrote: > Hey Cody, > > I ended up choosing a different way to do things, which is using Kafka to > commit my offsets. > It works fine, except it stores the offset in ZK instead of a Kafka topic > (investigating this right now). > > I understand your explanations, thank you, but I have one question: when you > say "Repartition almost always involves a shuffle", are you talking about > repartition using partitionBy()? > > > On 10/11/2016 01:23 AM, Cody Koeninger wrote: >> >> Repartition almost always involves a shuffle. >> >> Let me see if I can explain the recovery stuff... >> >> Say you start with two kafka partitions, topic-0 and topic-1. >> >> You shuffle those across 3 spark parittions, we'll label them A B and C. >> >> Your job is has written >> >> fileA: results for A, offset ranges for topic-0 offsets 10-60 and >> topic-1 offsets 23 - 66 >> fileB: results for B, offset ranges for topic-0 offsets 10-60 and >> topic-1 offsets 23 - 66 >> >> It's in the middle of writing for fileC when it dies. >> >> On startup, you need to start the job from >> topic-0 offsets 10-60 and topic-1 offsets 23 - 66 >> and do all the work in all the partitions (because of the shuffle, you >> dont know which kafka partition the data came from) >> >> You just don't overwrite fileA and fileB, because they already have >> correct data and offsets. You just write fileC. >> >> Then once youve recovered you go on about your job as normal, starting >> at topic-0 offsets 60, topic-1 offsets 66 >> >> Clear as mud? >> >> >> >> >> >> >> On Mon, Oct 10, 2016 at 5:36 PM, Samy Dindane <s...@dindane.com> wrote: >>> >>> >>> >>> On 10/10/2016 8:14 PM, Cody Koeninger wrote: >>>> >>>> >>>> Glad it was helpful :) >>>> >>>> As far as executors, my expectation is that if you have multiple >>>> executors running, and one of them crashes, the failed task will be >>>> submitted on a different executor. That is typically what I observe >>>> in spark apps, if that's not what you're seeing I'd try to get help on >>>> that specific issue. >>>> >>>> As far as kafka offsets per-partition, you need to atomically write >>>> your offsets and results in the same place.If that place is a >>>> filesystem, you need to be using atomic operations (I try to stay away >>>> from HDFS, but I believe renames are atomic, for instance). >>> >>> >>> That's what I am trying to do. >>>> >>>> If you're >>>> >>>> doing that in normal code, ie with an iterator instead of an rdd or >>>> dataframe, you may have to do some of that work yourself. >>> >>> >>> How would you do that without an iterator? >>> As far as I've seen, to have granular control over partitions (and each >>> partition's checkpoint), one needs to use `foreachPartition` or >>> `mapPartitions`, thus dealing with an iterator instead of a RDD. >>>> >>>> >>>> >>>> As far as using partitionBy, even after repartitioning you can still >>>> use the general idea of writing results and offsets in the same place. >>> >>> >>> I don't understand what you mean. What do you mean by "the same place"? >>> Why >>> would I want to do that? >>>> >>>> >>>> The major difference is that you need to write all offsets in each >>>> partition (because things have been shuffled), >>> >>> >>> Oh I didn't think of shuffling after partitioning. >>> But does it matter if I use partitionBy()? >>> >>> I will make a code example once at the office so you can take a look. >>>> >>>> >>>> and need to be more >>>> careful on startup after a failure. On startup, you'd see which >>>> partitions were incomplete, start the job from the offsets in the >>>> incomplete partitions, do the work for all partitions, >>> >>> >>> It's clear until here. >>>> >>>> >>>> but ignore the >>>> writes when they got to the complete partitions. I realize that's >>>> kind of a complicated description, if it doesn't make sense ask, or I >>>> may be able to put up some publicly visible code at some point in the >>>> future. >>> >>> >>> Yes I don't see what you mean. :) >>> >>> I really appreciate your help. Thanks a lot. >>> >>>> >>>> >>>> On Mon, Oct 10, 2016 at 12:12 PM, Samy Dindane <s...@dindane.com> wrote: >>>>> >>>>> >>>>> I just noticed that you're the author of the code I linked in my >>>>> previous >>>>> email. :) It's helpful. >>>>> >>>>> When using `foreachPartition` or `mapPartitions`, I noticed I can't ask >>>>> Spark to write the data on the disk using `df.write()` but I need to >>>>> use >>>>> the >>>>> iterator to do so, which means losing the ability of using >>>>> partitionBy(). >>>>> Do you know a workaround? Or I'll be forced to partition data manually. >>>>> >>>>> I think I understand why the job crashes when a single executor does: >>>>> `df.write()....save()` writes all the partitions in the same time, >>>>> which >>>>> fails if one of them has died. >>>>> Is that right? >>>>> >>>>> Thank you. >>>>> >>>>> Samy >>>>> >>>>> >>>>> On 10/10/2016 04:58 PM, Samy Dindane wrote: >>>>>> >>>>>> >>>>>> >>>>>> Hi Cody, >>>>>> >>>>>> I am writing a spark job that reads records from a Kafka topic and >>>>>> writes >>>>>> them on the file system. >>>>>> This would be straightforward if it weren't for the custom >>>>>> checkpointing >>>>>> logic I want to have; Spark's checkpointing doesn't suit us as it >>>>>> doesn't >>>>>> permit code updates. >>>>>> >>>>>> The custom checkpointing would be similar to this: >>>>>> >>>>>> >>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala >>>>>> I am trying to understand how this would work if an executor crashes, >>>>>> so >>>>>> I >>>>>> tried making one crash manually, but I noticed it kills the whole job >>>>>> instead of creating another executor to resume the task. >>>>>> Is that expected? Is there anything wrong with my approach? >>>>>> >>>>>> Thank you for your time. >>>>>> >>>>>> >>>>>> On 10/10/2016 04:29 PM, Cody Koeninger wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> What is it you're actually trying to accomplish? >>>>>>> >>>>>>> On Mon, Oct 10, 2016 at 5:26 AM, Samy Dindane <s...@dindane.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I managed to make a specific executor crash by using >>>>>>>> TaskContext.get.partitionId and throwing an exception for a specific >>>>>>>> executor. >>>>>>>> >>>>>>>> The issue I have now is that the whole job stops when a single >>>>>>>> executor >>>>>>>> crashes. >>>>>>>> Do I need to explicitly tell Spark to start a new executor and keep >>>>>>>> the >>>>>>>> other ones running? >>>>>>>> >>>>>>>> >>>>>>>> On 10/10/2016 11:19 AM, Samy Dindane wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am writing a streaming job that reads a Kafka topic. >>>>>>>>> As far as I understand, Spark does a 1:1 mapping between its >>>>>>>>> executors >>>>>>>>> and >>>>>>>>> Kafka partitions. >>>>>>>>> >>>>>>>>> In order to correctly implement my checkpoint logic, I'd like to >>>>>>>>> know >>>>>>>>> what >>>>>>>>> exactly happens when an executors crashes. >>>>>>>>> Also, is it possible to kill an executor manually for testing >>>>>>>>> purposes? >>>>>>>>> >>>>>>>>> Thank you. >>>>>>>>> >>>>>>>>> Samy >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> --------------------------------------------------------------------- >>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>> >>>>> >>> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org