Thanks Cody for very useful information. It's much more clear to me now. I had a lots of wrong assumptions. On Nov 23, 2015 10:19 PM, "Cody Koeninger" <[email protected]> wrote:
> Partitioner is an optional field when defining an rdd. KafkaRDD doesn't > define one, so you can't really assume anything about the way it's > partitioned, because spark doesn't know anything about the way it's > partitioned. If you want to rely on some property of how things were > partitioned as they were being produced into kafka, you need to do > foreachPartition or mapPartition yourself. Otherwise, spark will do a > shuffle for any operation that would ordinarily require a shuffle, even if > keys are already in the "right" place. > > Regarding the assignment of cores to partitions, that's not really > accurate. Each kafka partition will correspond to a spark partition. If > you do an operation that shuffles, that relationship no longer holds true. > Even if you're doing a straight map operation without a shuffle, you will > probably get 1 executor core working on 1 partition, but there's no > guarantee the scheduler will do that, and no guarantee it'll be the same > core / partition relationship for the next batch. > > > On Mon, Nov 23, 2015 at 9:01 AM, Thúy Hằng Lê <[email protected]> > wrote: > >> Thanks Cody, >> >> I still have concerns about this. >> What's do you mean by saying Spark direct stream doesn't have a default >> partitioner? Could you please help me to explain more? >> >> When i assign 20 cores to 20 Kafka partitions, I am expecting each core >> will work on a partition. Is it correct? >> >> I'm still couldn't figure out how RDD will be partitioned after mapToPair >> function. It would be great if you could brieftly explain ( or send me some >> document, i couldnt find it) about how shuffle work on mapToPair function. >> >> Thank you very much. >> On Nov 23, 2015 12:26 AM, "Cody Koeninger" <[email protected]> wrote: >> >>> Spark direct stream doesn't have a default partitioner. >>> >>> If you know that you want to do an operation on keys that are already >>> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a >>> shuffle. >>> >>> On Sat, Nov 21, 2015 at 11:46 AM, trung kien <[email protected]> wrote: >>> >>>> Hi all, >>>> >>>> I am having problem of understanding how RDD will be partitioned after >>>> calling mapToPair function. >>>> Could anyone give me more information about parititoning in this >>>> function? >>>> >>>> I have a simple application doing following job: >>>> >>>> JavaPairInputDStream<String, String> messages = >>>> KafkaUtils.createDirectStream(...) >>>> >>>> JavaPairDStream<String, Double> stats = messages.mapToPair(JSON_DECODE) >>>> >>>> .reduceByKey(SUM); >>>> >>>> saveToDB(stats) >>>> >>>> I setup 2 workers (each dedicate 20 cores) for this job. >>>> My kafka topic has 40 partitions (I want each core handle a partition), >>>> and the messages send to queue are partitioned by the same key as mapToPair >>>> function. >>>> I'm using default Partitioner of both Kafka and Sprark. >>>> >>>> Ideally, I shouldn't see the data shuffle between cores in mapToPair >>>> stage, right? >>>> However, in my Spark UI, I see that the "Locality Level" for this stage >>>> is "ANY", which means data need to be transfered. >>>> Any comments on this? >>>> >>>> -- >>>> Thanks >>>> Kien >>>> >>> >>> >
