Many Thanks Yong. Your solution rocks. If you could paste your answer on stack overflow then I can mark it as correct answer.
Also, can you tell me how to achieve same using companion object. Cheers Pari On 29 March 2017 at 21:37, Yong Zhang <java8...@hotmail.com> wrote: > The error message indeed is not very clear. > > > What you did wrong is that the repartitionAndSortWithinPartitions not > only requires PairRDD, but also OrderedRDD. Your case class as key is NOT > Ordered. > > > Either you extends it from Ordered, or provide a companion object to do > the implicit Ordering. > > > scala> spark.versionres1: String = 2.1.0 > > scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: > Long) extends Ordered[DeviceKey] { | import > scala.math.Ordered.orderingToOrdered | def compare(that: DeviceKey): > Int = > | (this.serialNum, this.eventDate, this.EventTs * -1) compare > | (that.serialNum, that.eventDate, that.EventTs * -1) > | }defined class DeviceKey > > scala> > > scala> val t = > sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), > 1)t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at > parallelize at <console>:26 > > scala> > > scala> class DeviceKeyPartitioner(partitions: Int) extends > org.apache.spark.Partitioner { > | require(partitions >= 0, s"Number of partitions ($partitions) > cannot be negative.") > | > | override def numPartitions: Int = partitions > | > | override def getPartition(key: Any): Int = { > | val k = key.asInstanceOf[DeviceKey] > | k.serialNum.hashCode() % numPartitions > | } > | }defined class DeviceKeyPartitioner > > scala> > > scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))res0: > org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at > repartitionAndSortWithinPartitions at <console>:30 > > > Yong > > > ------------------------------ > *From:* Pariksheet Barapatre <pbarapa...@gmail.com> > *Sent:* Wednesday, March 29, 2017 9:02 AM > *To:* user > *Subject:* Secondary Sort using Apache Spark 1.6 > > Hi, > <http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6#> > > I am referring web link http://codingjunkie.net/spark-secondary-sort/ to > implement secondary sort in my spark job. > > I have defined my key case class as > > case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) { > implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = { > Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1)) > } > } > > but when I try to apply function > t.repartitionAndSortWithinPartitions(partitioner) > > #t is a RDD[(DeviceKey, Int)] > > I get error > I am getting error as - > value repartitionAndSortWithinPartitions is not a member of > org.apache.spark.rdd.RDD[(DeviceKey, Int)] > > > Example code available at > http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6 > > Could somebody help me to understand error. > > Many Thanks > > Pari > > > -- > Cheers, > Pari > -- Cheers, Pari