Yep that's exactly what we want. Thanks for all the info Cody.
Dave.
On 13 Jan 2016 18:29, "Cody Koeninger" <c...@koeninger.org> wrote:

> The idea here is that the custom partitioner shouldn't actually get used
> for repartitioning the kafka stream (because that would involve a shuffle,
> which is what you're trying to avoid).  You're just assigning a partitioner
> because you know how it already is partitioned.
>
>
> On Wed, Jan 13, 2016 at 11:22 AM, Dave <dave.davo...@gmail.com> wrote:
>
>> So for case 1 below
>> - subclass or modify the direct stream and kafkardd.  They're private, so
>> you'd need to rebuild just the external kafka project, not all of spark
>> When the data is read from Kafka it will be partitioned correctly with
>> the Custom Partitioner passed in to the new direct stream and kafka RDD
>> implementations.
>>
>> For case 2
>> - write a wrapper subclass of rdd that takes a given custom partitioner
>> and rdd in the constructor, overrides partitioner, and delegates every
>> other method to the wrapped rdd.  This should be possible without
>> modification to any existing spark code.  You'd use it something like ....
>> Am I correct in saying that the data from Kafka will not be read into
>> memory in the cluster (kafka server is not located on the Spark Cluster in
>> my use case) until the following code is executed
>> stream.transform { rdd =>
>>   val wrapped = YourWrapper(cp, rdd)
>>   wrapped.join(reference)
>> }
>> In which case it will run through the partitioner of the wrapped RDD when
>> it arrives in the cluster for the first time i.e. no shuffle.
>>
>> Thanks,
>> Dave.
>>
>>
>>
>> On 13/01/16 17:00, Cody Koeninger wrote:
>>
>> In the case here of a kafkaRDD, the data doesn't reside on the cluster,
>> it's not cached by default.  If you're running kafka on the same nodes as
>> spark, then data locality would play a factor, but that should be handled
>> by the existing getPreferredLocations method.
>>
>> On Wed, Jan 13, 2016 at 10:46 AM, Dave <dave.davo...@gmail.com> wrote:
>>
>>> Thanks Cody, appreciate the response.
>>>
>>> With this pattern the partitioners will now match when the join is
>>> executed.
>>> However, does the wrapper RDD not need to set the partition meta data on
>>> the wrapped RDD in order to allow Spark to know where the data for each
>>> partition resides in the cluster.
>>>
>>> Thanks,
>>> Dave.
>>>
>>>
>>> On 13/01/16 16:21, Cody Koeninger wrote:
>>>
>>> If two rdds have an identical partitioner, joining should not involve a
>>> shuffle.
>>>
>>> You should be able to override the partitioner without calling
>>> partitionBy.
>>>
>>> Two ways I can think of to do this:
>>> - subclass or modify the direct stream and kafkardd.  They're private,
>>> so you'd need to rebuild just the external kafka project, not all of spark
>>>
>>> - write a wrapper subclass of rdd that takes a given custom partitioner
>>> and rdd in the constructor, overrides partitioner, and delegates every
>>> other method to the wrapped rdd.  This should be possible without
>>> modification to any existing spark code.  You'd use it something like
>>>
>>> val cp = YourCustomPartitioner(...)
>>> val reference = YourReferenceRDD(cp, ...)
>>> val stream = KafkaUtils....
>>>
>>> stream.transform { rdd =>
>>>   val wrapped = YourWrapper(cp, rdd)
>>>   wrapped.join(reference)
>>> }
>>>
>>>
>>> I haven't had reason to do either one of those approaches, so YMMV, but
>>> I believe others have
>>>
>>>
>>>
>>>
>>> On Wed, Jan 13, 2016 at 3:40 AM, ddav < <dave.davo...@gmail.com>
>>> dave.davo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have the following use case:
>>>>
>>>> 1. Reference data stored in an RDD that is persisted and partitioned
>>>> using a
>>>> simple custom partitioner.
>>>> 2. Input stream from kafka that uses the same partitioner algorithm as
>>>> the
>>>> ref data RDD - this partitioning is done in kafka.
>>>>
>>>> I am using kafka direct streams so the number of kafka partitions map
>>>> to the
>>>> number of partitions in the spark RDD. From testing and the
>>>> documentation I
>>>> see Spark does not know anything about how the data has been
>>>> partitioned in
>>>> kafka.
>>>>
>>>> In my use case I need to join the reference data RDD and the input
>>>> stream
>>>> RDD.  Due to the fact I have manually ensured the incoming data from
>>>> kafka
>>>> uses the same partitioning algorithm I know the data has been grouped
>>>> correctly in the input stream RDD in Spark but I cannot do a join
>>>> without a
>>>> shuffle step due to the fact Spark has no knowledge of how the data has
>>>> been
>>>> partitioned.
>>>>
>>>> I have two ways to do this.
>>>> 1. Explicitly call PartitionBy(CutomParitioner) on the input stream RDD
>>>> followed by a join. This results in a shuffle of the input stream RDD
>>>> and
>>>> then the co-partitioned join to take place.
>>>> 2. Call join on the reference data RDD passing in the input stream RDD.
>>>> Spark will do a shuffle under the hood in this case and the join will
>>>> take
>>>> place. The join will do its best to run on a node that has local access
>>>> to
>>>> the reference data RDD.
>>>>
>>>> Is there any difference between the 2 methods above or will both cause
>>>> the
>>>> same sequence of events to take place in Spark?
>>>> Is all I have stated above correct?
>>>>
>>>> Finally, is there any road map feature for looking at allowing the user
>>>> to
>>>> push a partitioner into an already created RDD and not to do a shuffle.
>>>> Spark in this case trusts that the data is setup correctly (as in the
>>>> use
>>>> case above) and simply fills in the necessary meta data on the RDD
>>>> partitions i.e. check the first entry in each partition to determine the
>>>> partition number of the data.
>>>>
>>>> Thank you in advance for any help on this issue.
>>>> Dave.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: <user-unsubscr...@spark.apache.org>
>>>> user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: <user-h...@spark.apache.org>
>>>> user-h...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>
>>
>

Reply via email to