Hey, The question is tricky, here is a possible answer by defining years as keys for a hashmap per client and merging those :
import scalaz._ import Scalaz._ val sc = new SparkContext("local[*]", "sandbox") // Create RDD of your objects val rdd = sc.parallelize(Seq( ("A", 2015, 4), ("A", 2014, 12), ("A", 2013, 1), ("B", 2015, 24), ("B", 2013, 4) )) // Search for all the years in the RDD val minYear = rdd.map(_._2).reduce(Math.min) // look for minimum year val maxYear = rdd.map(_._2).reduce(Math.max) // look for maximum year val sequenceOfYears = maxYear to minYear by -1 // create sequence of years from max to min // Define functions to build, for each client, a Map of year -> value for year, and how those maps will be merged def createCombiner(obj: (Int, Int)): Map[Int, String] = Map(obj._1 -> obj._2.toString) def mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + (obj._1 -> obj._2.toString) def mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = accum1 |+| accum2 // I’m lazy so I use Scalaz to merge two maps of year -> value, I assume we don’t have two lines with same client and year… // For each client, check for each year from maxYear to minYear if it exists in the computed map. If not input blank. val result = rdd .map { case obj => (obj._1, (obj._2, obj._3)) } .combineByKey(createCombiner, mergeValue, mergeCombiners) .map{ case (name, mapOfYearsToValues) => (Seq(name) ++ sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, " "))).mkString(",")} // here we assume that sequence of all years isn’t too big to not fit in memory. If you had to compute for each day, it may break and you would definitely need to use a specialized timeseries library… result.foreach(println) sc.stop() Best regards, Fanilo De : Adrian Tanase [mailto:atan...@adobe.com] Envoyé : vendredi 30 octobre 2015 11:50 À : Deng Ching-Mallete; Ascot Moss Cc : User Objet : Re: Pivot Data in Spark and Scala Its actually a bit tougher as you’ll first need all the years. Also not sure how you would reprsent your “columns” given they are dynamic based on the input data. Depending on your downstream processing, I’d probably try to emulate it with a hash map with years as keys instead of the columns. There is probably a nicer solution using the data frames API but I’m not familiar with it. If you actually need vectors I think this article I saw recently on the data bricks blog will highlight some options (look for gather encoder) https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html -adrian From: Deng Ching-Mallete Date: Friday, October 30, 2015 at 4:35 AM To: Ascot Moss Cc: User Subject: Re: Pivot Data in Spark and Scala Hi, You could transform it into a pair RDD then use the combineByKey function. HTH, Deng On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss <ascot.m...@gmail.com<mailto:ascot.m...@gmail.com>> wrote: Hi, I have data as follows: A, 2015, 4 A, 2014, 12 A, 2013, 1 B, 2015, 24 B, 2013 4 I need to convert the data to a new format: A , 4, 12, 1 B, 24, , 4 Any idea how to make it in Spark Scala? Thanks ________________________________ Ce message et les pièces jointes sont confidentiels et réservés à l'usage exclusif de ses destinataires. Il peut également être protégé par le secret professionnel. Si vous recevez ce message par erreur, merci d'en avertir immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra être recherchée quant au contenu de ce message. Bien que les meilleurs efforts soient faits pour maintenir cette transmission exempte de tout virus, l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne saurait être recherchée pour tout dommage résultant d'un virus transmis. This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, the Worldline liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted.