Yep that's exactly what we want. Thanks for all the info Cody. Dave. On 13 Jan 2016 18:29, "Cody Koeninger" <c...@koeninger.org> wrote:
> The idea here is that the custom partitioner shouldn't actually get used > for repartitioning the kafka stream (because that would involve a shuffle, > which is what you're trying to avoid). You're just assigning a partitioner > because you know how it already is partitioned. > > > On Wed, Jan 13, 2016 at 11:22 AM, Dave <dave.davo...@gmail.com> wrote: > >> So for case 1 below >> - subclass or modify the direct stream and kafkardd. They're private, so >> you'd need to rebuild just the external kafka project, not all of spark >> When the data is read from Kafka it will be partitioned correctly with >> the Custom Partitioner passed in to the new direct stream and kafka RDD >> implementations. >> >> For case 2 >> - write a wrapper subclass of rdd that takes a given custom partitioner >> and rdd in the constructor, overrides partitioner, and delegates every >> other method to the wrapped rdd. This should be possible without >> modification to any existing spark code. You'd use it something like .... >> Am I correct in saying that the data from Kafka will not be read into >> memory in the cluster (kafka server is not located on the Spark Cluster in >> my use case) until the following code is executed >> stream.transform { rdd => >> val wrapped = YourWrapper(cp, rdd) >> wrapped.join(reference) >> } >> In which case it will run through the partitioner of the wrapped RDD when >> it arrives in the cluster for the first time i.e. no shuffle. >> >> Thanks, >> Dave. >> >> >> >> On 13/01/16 17:00, Cody Koeninger wrote: >> >> In the case here of a kafkaRDD, the data doesn't reside on the cluster, >> it's not cached by default. If you're running kafka on the same nodes as >> spark, then data locality would play a factor, but that should be handled >> by the existing getPreferredLocations method. >> >> On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com> wrote: >> >>> Thanks Cody, appreciate the response. >>> >>> With this pattern the partitioners will now match when the join is >>> executed. >>> However, does the wrapper RDD not need to set the partition meta data on >>> the wrapped RDD in order to allow Spark to know where the data for each >>> partition resides in the cluster. >>> >>> Thanks, >>> Dave. >>> >>> >>> On 13/01/16 16:21, Cody Koeninger wrote: >>> >>> If two rdds have an identical partitioner, joining should not involve a >>> shuffle. >>> >>> You should be able to override the partitioner without calling >>> partitionBy. >>> >>> Two ways I can think of to do this: >>> - subclass or modify the direct stream and kafkardd. They're private, >>> so you'd need to rebuild just the external kafka project, not all of spark >>> >>> - write a wrapper subclass of rdd that takes a given custom partitioner >>> and rdd in the constructor, overrides partitioner, and delegates every >>> other method to the wrapped rdd. This should be possible without >>> modification to any existing spark code. You'd use it something like >>> >>> val cp = YourCustomPartitioner(...) >>> val reference = YourReferenceRDD(cp, ...) >>> val stream = KafkaUtils.... >>> >>> stream.transform { rdd => >>> val wrapped = YourWrapper(cp, rdd) >>> wrapped.join(reference) >>> } >>> >>> >>> I haven't had reason to do either one of those approaches, so YMMV, but >>> I believe others have >>> >>> >>> >>> >>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < <dave.davo...@gmail.com> >>> dave.davo...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have the following use case: >>>> >>>> 1. Reference data stored in an RDD that is persisted and partitioned >>>> using a >>>> simple custom partitioner. >>>> 2. Input stream from kafka that uses the same partitioner algorithm as >>>> the >>>> ref data RDD - this partitioning is done in kafka. >>>> >>>> I am using kafka direct streams so the number of kafka partitions map >>>> to the >>>> number of partitions in the spark RDD. From testing and the >>>> documentation I >>>> see Spark does not know anything about how the data has been >>>> partitioned in >>>> kafka. >>>> >>>> In my use case I need to join the reference data RDD and the input >>>> stream >>>> RDD. Due to the fact I have manually ensured the incoming data from >>>> kafka >>>> uses the same partitioning algorithm I know the data has been grouped >>>> correctly in the input stream RDD in Spark but I cannot do a join >>>> without a >>>> shuffle step due to the fact Spark has no knowledge of how the data has >>>> been >>>> partitioned. >>>> >>>> I have two ways to do this. >>>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD >>>> followed by a join. This results in a shuffle of the input stream RDD >>>> and >>>> then the co-partitioned join to take place. >>>> 2. Call join on the reference data RDD passing in the input stream RDD. >>>> Spark will do a shuffle under the hood in this case and the join will >>>> take >>>> place. The join will do its best to run on a node that has local access >>>> to >>>> the reference data RDD. >>>> >>>> Is there any difference between the 2 methods above or will both cause >>>> the >>>> same sequence of events to take place in Spark? >>>> Is all I have stated above correct? >>>> >>>> Finally, is there any road map feature for looking at allowing the user >>>> to >>>> push a partitioner into an already created RDD and not to do a shuffle. >>>> Spark in this case trusts that the data is setup correctly (as in the >>>> use >>>> case above) and simply fills in the necessary meta data on the RDD >>>> partitions i.e. check the first entry in each partition to determine the >>>> partition number of the data. >>>> >>>> Thank you in advance for any help on this issue. >>>> Dave. >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: <user-unsubscr...@spark.apache.org> >>>> user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: <user-h...@spark.apache.org> >>>> user-h...@spark.apache.org >>>> >>>> >>> >>> >> >> >