Thanks Cody for very useful information.

It's much more clear to me now. I had a lots of wrong assumptions.
On Nov 23, 2015 10:19 PM, "Cody Koeninger" <[email protected]> wrote:

> Partitioner is an optional field when defining an rdd.  KafkaRDD doesn't
> define one, so you can't really assume anything about the way it's
> partitioned, because spark doesn't know anything about the way it's
> partitioned.  If you want to rely on some property of how things were
> partitioned as they were being produced into kafka, you need to do
> foreachPartition or mapPartition yourself.  Otherwise, spark will do a
> shuffle for any operation that would ordinarily require a shuffle, even if
> keys are already in the "right" place.
>
> Regarding the assignment of cores to partitions, that's not really
> accurate.  Each kafka partition will correspond to a spark partition.  If
> you do an operation that shuffles, that relationship no longer holds true.
> Even if you're doing a straight map operation without a shuffle, you will
> probably get 1 executor core working on 1 partition, but there's no
> guarantee the scheduler will do that, and no guarantee it'll be the same
> core / partition relationship for the next batch.
>
>
> On Mon, Nov 23, 2015 at 9:01 AM, Thúy Hằng Lê <[email protected]>
> wrote:
>
>> Thanks Cody,
>>
>> I still have concerns about this.
>> What's do you mean by saying Spark direct stream doesn't have a default
>> partitioner? Could you please help me to explain more?
>>
>> When i assign 20 cores to 20 Kafka partitions, I am expecting each core
>> will work on a partition. Is it correct?
>>
>> I'm still couldn't figure out how RDD will be partitioned after mapToPair
>> function. It would be great if you could brieftly explain ( or send me some
>> document, i couldnt find it) about how shuffle work on mapToPair function.
>>
>> Thank you very much.
>> On Nov 23, 2015 12:26 AM, "Cody Koeninger" <[email protected]> wrote:
>>
>>> Spark direct stream doesn't have a default partitioner.
>>>
>>> If you know that you want to do an operation on keys that are already
>>> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a
>>> shuffle.
>>>
>>> On Sat, Nov 21, 2015 at 11:46 AM, trung kien <[email protected]> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am having problem of understanding how RDD will be partitioned after
>>>> calling mapToPair function.
>>>> Could anyone give me more information about parititoning in this
>>>> function?
>>>>
>>>> I have a simple application doing following job:
>>>>
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(...)
>>>>
>>>> JavaPairDStream<String, Double> stats = messages.mapToPair(JSON_DECODE)
>>>>
>>>> .reduceByKey(SUM);
>>>>
>>>> saveToDB(stats)
>>>>
>>>> I setup 2 workers (each dedicate 20 cores) for this job.
>>>> My kafka topic has 40 partitions (I want each core handle a partition),
>>>> and the messages send to queue are partitioned by the same key as mapToPair
>>>> function.
>>>> I'm using default Partitioner of both Kafka and Sprark.
>>>>
>>>> Ideally, I shouldn't see the data shuffle between cores in mapToPair
>>>> stage, right?
>>>> However, in my Spark UI, I see that the "Locality Level" for this stage
>>>> is "ANY", which means data need to be transfered.
>>>> Any comments on this?
>>>>
>>>> --
>>>> Thanks
>>>> Kien
>>>>
>>>
>>>
>

Reply via email to