A more clear explanation.

`parallelize` does not apply a partitioner. We can see this pretty quickly
with a quick code example

scala> val rdd1 = sc.parallelize(Seq(("aa" , 1),("aa",2), ("aa", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at
parallelize at <console>:24

scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None

It has not partitioner because parallelize just packs up the collection
into partition metadata without looking at the actual contents of the
collection.

scala> rdd1.foreachPartition(it => println(it.length))
1
0
1
1
0
0
0
0

If we actually shuffle the data using the hash partitioner (using the
repartition command) we get the expected results.

scala> rdd1.repartition(8).foreachPartition(it => println(it.length))
0
0
0
0
0
0
0
3


On Sat, Jun 24, 2017 at 12:22 PM Russell Spitzer <russell.spit...@gmail.com>
wrote:

> Neither of your code examples invoke a repartitioning. Add in a
> repartition command.
>
> On Sat, Jun 24, 2017, 11:53 AM Vikash Pareek <
> vikash.par...@infoobjects.com> wrote:
>
>> Hi Vadim,
>>
>> Thank you for your response.
>>
>> I would like to know how partitioner choose the key, If we look at my
>> example then following question arises:
>> 1. In case of rdd1, hash partitioning should calculate hashcode of key
>> (i.e. *"aa"* in this case), so *all records should go to single
>> partition*
>> instead of uniform distribution?
>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>> going to work i.e. *what is the key* to calculate hashcode?
>>
>>
>>
>> Best Regards,
>>
>>
>> [image: InfoObjects Inc.] <http://www.infoobjects.com/>
>> Vikash Pareek
>> Team Lead  *InfoObjects Inc.*
>> Big Data Analytics
>>
>> m: +91 8800206898 <+91%2088002%2006898> a: E5, Jhalana Institutionall
>> Area, Jaipur, Rajasthan 302004
>> w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com
>>
>>
>>
>>
>> On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> This is the code that chooses the partition for a key:
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88
>>>
>>> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>>>
>>> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
>>> vikash.par...@infoobjects.com> wrote:
>>>
>>>> I am trying to understand how spark partitoing works.
>>>>
>>>> To understand this I have following piece of code on spark 1.6
>>>>
>>>>     def countByPartition1(rdd: RDD[(String, Int)]) = {
>>>>         rdd.mapPartitions(iter => Iterator(iter.length))
>>>>     }
>>>>     def countByPartition2(rdd: RDD[String]) = {
>>>>         rdd.mapPartitions(iter => Iterator(iter.length))
>>>>     }
>>>>
>>>> //RDDs Creation
>>>>     val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>>>> ("aa",
>>>> 1)), 8)
>>>>     countByPartition(rdd1).collect()
>>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>>
>>>>     val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>>>>     countByPartition(rdd2).collect()
>>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>>
>>>> In both the cases data is distributed uniformaly.
>>>> I do have following questions on the basis of above observation:
>>>>
>>>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>>>> (i.e. "aa" in this case), so all records should go to single partition
>>>> instead of uniform distribution?
>>>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>>>> going to work i.e. what is the key to calculate hashcode?
>>>>
>>>> I have followed @zero323 answer but not getting answer of these.
>>>>
>>>> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>>>>
>>>>
>>>>
>>>>
>>>> -----
>>>>
>>>> __Vikash Pareek
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>

Reply via email to