There seems to be a way around it. I created a ShuffledRDD with the
partitioner in my code and using setKeyOrdering on it. It worked!!
*new ShuffledRDD[String, String, String](pairRDD, new
StraightPartitioner(10))*
* .setKeyOrdering(new scala.math.Ordering[String]{*
* def compare(x : String, y : String) = x compare y*
* })*
On Wed, Feb 4, 2015 at 8:39 AM, Imran Rashid <[email protected]> wrote:
> I think you are interested in secondary sort, which is still being worked
> on:
>
> https://issues.apache.org/jira/browse/SPARK-3655
>
> On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak <[email protected]> wrote:
>
>> I thought thats what sort based shuffled did, sort the keys going to the
>> same partition.
>>
>> I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
>> ordering of c2 type is the problem here.
>>
>> On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen <[email protected]> wrote:
>>
>>> Hm, I don't think the sort partitioner is going to cause the result to
>>> be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
>>> even guaranteed that the type of c2 has an ordering, right?
>>>
>>> On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 <[email protected]>
>>> wrote:
>>> > I am trying to implement secondary sort in spark as we do in
>>> map-reduce.
>>> >
>>> > Here is my data(tab separated, without c1, c2, c2).
>>> > c1 c2 c3
>>> > 1 2 4
>>> > 1 3 6
>>> > 2 4 7
>>> > 2 6 8
>>> > 3 5 5
>>> > 3 1 8
>>> > 3 2 0
>>> >
>>> > To do secondary sort, I create paried RDD as
>>> >
>>> > /((c1 + ","+ c2), row)/
>>> >
>>> > and then use a custom partitioner to partition only on c1. I have set
>>> > /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
>>> For the
>>> > key "3" I am expecting to get
>>> > (3, 1)
>>> > (3, 2)
>>> > (3, 5)
>>> > but still getting the original order
>>> > 3,5
>>> > 3,1
>>> > 3,2
>>> >
>>> > Here is the custom partitioner code:
>>> >
>>> > /class StraightPartitioner(p: Int) extends
>>> org.apache.spark.Partitioner {
>>> > def numPartitions = p
>>> > def getPartition(key: Any) = {
>>> > key.asInstanceOf[String].split(",")(0).toInt
>>> > }
>>> >
>>> > }/
>>> >
>>> > and driver code, please tell me what I am doing wrong
>>> >
>>> > /val conf = new SparkConf().setAppName("MapInheritanceExample")
>>> > conf.set("spark.shuffle.manager", "SORT");
>>> > val sc = new SparkContext(conf)
>>> > val pF = sc.textFile(inputFile)
>>> >
>>> > val log = LogFactory.getLog("MapFunctionTest")
>>> > val partitionedRDD = pF.map { x =>
>>> >
>>> > var arr = x.split("\t");
>>> > (arr(0)+","+arr(1), null)
>>> >
>>> > }.partitionBy(new StraightPartitioner(10))
>>> >
>>> > var outputRDD = partitionedRDD.mapPartitions(p => {
>>> > p.map({ case(o, n) => {
>>> > o
>>> > }
>>> > })
>>> > })/
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > ---------------------------------------------------------------------
>>> > To unsubscribe, e-mail: [email protected]
>>> > For additional commands, e-mail: [email protected]
>>> >
>>>
>>
>>
>