RangePartitioner in Spark 1.2.1

2015-02-17 Thread java8964
Hi, Sparkers:
I just happened to search in google for something related to the 
RangePartitioner of spark, and found an old thread in this email list as here:
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html
I followed the code example mentioned in that email thread as following:
scala  import org.apache.spark.RangePartitionerimport 
org.apache.spark.RangePartitioner
scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, 
fox, gas, horse, index, jet, kitsch, long, moon, Neptune, 
ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, 
xeon, Yam, zebra))rdd: org.apache.spark.rdd.RDD[String] = 
ParallelCollectionRDD[0] at parallelize at console:13
scala rdd.keyBy(s = s(0).toUpper)res0: org.apache.spark.rdd.RDD[(Char, 
String)] = MappedRDD[1] at keyBy at console:16
scala res0.partitionBy(new RangePartitioner[Char, String](26, 
res0)).valuesres1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at 
console:18
scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, 
s))).collect.foreach(println)
The above example is clear for me to understand the meaning of the 
RangePartitioner, but to my surprise, I got the following result:
(0,apple)(0,Ball)(1,cat)(2,dog)(3,Elephant)(4,fox)(5,gas)(6,horse)(7,index)(8,jet)(9,kitsch)(10,long)(11,moon)(12,Neptune)(13,ooze)(14,Pen)(15,quiet)(16,rose)(17,sun)(18,talk)(19,umbrella)(20,voice)(21,Walrus)(22,xeon)(23,Yam)(24,zebra)
instead of a perfect range index from 0 to 25 in old email thread. Why is that? 
Is this a bug, or some new feature I don't understand?
BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary 
release.
Thanks
Yong  

Re: RangePartitioner in Spark 1.2.1

2015-02-17 Thread Aaron Davidson
RangePartitioner does not actually provide a guarantee that all partitions
will be equal sized (that is hard), and instead uses sampling to
approximate equal buckets. Thus, it is possible that a bucket is left empty.

If you want the specified behavior, you should define your own partitioner.
It would look something like this (untested):
class AlphabetPartitioner extends Partitioner {
  def numPartitions = 26
  def getPartition(key: Any): Int = key match {
case s: String = s(0).toUpper - 'A'
  }
  override def equals(other: Any): Boolean =
other.isInstanceOf[AlphabetPartitioner]
  override def hashCode: Int = 0
}

On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I just happened to search in google for something related to the
 RangePartitioner of spark, and found an old thread in this email list as
 here:


 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html

 I followed the code example mentioned in that email thread as following:

 scala  import org.apache.spark.RangePartitioner
 import org.apache.spark.RangePartitioner

 scala val rdd = sc.parallelize(List(apple, Ball, cat, dog,
 Elephant, fox, gas, horse, index, jet, kitsch, long,
 moon, Neptune, ooze, Pen, quiet, rose, sun, talk,
 umbrella, voice, Walrus, xeon, Yam, zebra))
 rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
 parallelize at console:13

 scala rdd.keyBy(s = s(0).toUpper)
 res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at
 console:16

 scala res0.partitionBy(new RangePartitioner[Char, String](26,
 res0)).values
 res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
 console:18

 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx,
 s))).collect.foreach(println)

 The above example is clear for me to understand the meaning of the
 RangePartitioner, but to my surprise, I got the following result:

 *(0,apple)*
 *(0,Ball)*
 (1,cat)
 (2,dog)
 (3,Elephant)
 (4,fox)
 (5,gas)
 (6,horse)
 (7,index)
 (8,jet)
 (9,kitsch)
 (10,long)
 (11,moon)
 (12,Neptune)
 (13,ooze)
 (14,Pen)
 (15,quiet)
 (16,rose)
 (17,sun)
 (18,talk)
 (19,umbrella)
 (20,voice)
 (21,Walrus)
 (22,xeon)
 (23,Yam)
 (24,zebra)

 instead of a perfect range index from 0 to 25 in old email thread. Why is
 that? Is this a bug, or some new feature I don't understand?

 BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4
 binary release.

 Thanks

 Yong