Many Thanks Yong.

Your solution rocks. If you could paste your answer on stack overflow then
I can mark it as correct answer.

Also, can you tell me how to achieve same using companion object.

Cheers
Pari

On 29 March 2017 at 21:37, Yong Zhang <java8...@hotmail.com> wrote:

> The error message indeed is not very clear.
>
>
> What you did wrong is that the repartitionAndSortWithinPartitions not
> only requires PairRDD, but also OrderedRDD. Your case class as key is NOT
> Ordered.
>
>
> Either you extends it from Ordered, or provide a companion object to do
> the implicit Ordering.
>
>
> scala> spark.versionres1: String = 2.1.0
>
> scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: 
> Long) extends Ordered[DeviceKey] {     |   import 
> scala.math.Ordered.orderingToOrdered     |   def compare(that: DeviceKey): 
> Int =
>      |      (this.serialNum, this.eventDate, this.EventTs * -1) compare
>      |      (that.serialNum, that.eventDate, that.EventTs * -1)
>      | }defined class DeviceKey
>
> scala>
>
> scala> val t = 
> sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), 
> 1)t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at 
> parallelize at <console>:26
>
> scala>
>
> scala> class DeviceKeyPartitioner(partitions: Int) extends 
> org.apache.spark.Partitioner {
>      |     require(partitions >= 0, s"Number of partitions ($partitions) 
> cannot be negative.")
>      |
>      |     override def numPartitions: Int = partitions
>      |
>      |     override def getPartition(key: Any): Int = {
>      |       val k = key.asInstanceOf[DeviceKey]
>      |       k.serialNum.hashCode() % numPartitions
>      |     }
>      | }defined class DeviceKeyPartitioner
>
> scala>
>
> scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))res0: 
> org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at 
> repartitionAndSortWithinPartitions at <console>:30
>
>
> Yong
>
>
> ------------------------------
> *From:* Pariksheet Barapatre <pbarapa...@gmail.com>
> *Sent:* Wednesday, March 29, 2017 9:02 AM
> *To:* user
> *Subject:* Secondary Sort using Apache Spark 1.6
>
> Hi,
> <http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6#>
>
> I am referring web link http://codingjunkie.net/spark-secondary-sort/ to
> implement secondary sort in my spark job.
>
> I have defined my key case class as
>
> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
>       implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
>        Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
>     }
> }
>
> but when I try to apply function
> t.repartitionAndSortWithinPartitions(partitioner)
>
> #t is a RDD[(DeviceKey, Int)]
>
> I get error
> I am getting error as -
> value repartitionAndSortWithinPartitions is not a member of 
> org.apache.spark.rdd.RDD[(DeviceKey, Int)]
>
>
> Example code available at
> http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6
>
> Could somebody help me to understand error.
>
> Many Thanks
>
> Pari
>
>
> --
> Cheers,
> Pari
>



-- 
Cheers,
Pari

Reply via email to