You can take a look at the smvPivot function in the SMV library ( ).  Should look for method "smvPivot" in 
SmvDFHelper (
  You can also perform the pivot on a group-by-group basis.  See smvPivot and 
smvPivotSum in SmvGroupedDataFunc 

Docs from smvPivotSum are copied below.  Note that you don't have to specify 
the baseOutput columns, but if you don't, it will force an additional action on 
the input data frame to build the cross products of all possible values in your 
input pivot columns. 

Perform a normal SmvPivot operation followed by a sum on all the output pivot 
For example:
df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", 
"5_14_B", "6_14_A", "6_14_B")
and the following input:
| id  | month | product | count |
| --- | ----- | ------- | ----- |
| 1   | 5/14  |   A     |   100 |
| 1   | 6/14  |   B     |   200 |
| 1   | 5/14  |   B     |   300 |
will produce the following output:
| id  | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B |
| --- | ------------ | ------------ | ------------ | ------------ |
| 1   | 100          | 300          | NULL         | 200          |
The sequence of column names whose values will be used as the output pivot 
column names.
The columns whose value will be copied to the pivoted output columns.
The expected base output column names (without the value column prefix). The 
user is required to supply the list of expected pivot column output names to 
avoid and extra action on the input DataFrame just to extract the possible 
pivot columns. if an empty sequence is provided, then the base output columns 
will be extracted from values in the pivot columns (will cause an action on the 
entire DataFrame!)

PS: shoot me an email if you run into any issues using SMV.

> Hey,
> The question is tricky, here is a possible answer by defining years as keys 
> for a hashmap per client and merging those :
> import scalaz._
> import Scalaz._
> val sc = new SparkContext("local[*]", "sandbox")
> // Create RDD of your objects
> val rdd = sc.parallelize(Seq(
>   ("A", 2015, 4),
>   ("A", 2014, 12),
>   ("A", 2013, 1),
>   ("B", 2015, 24),
>   ("B", 2013, 4)
> ))
> // Search for all the years in the RDD
> val minYear =    // look for minimum year
> val maxYear =    // look for maximum year
> val sequenceOfYears = maxYear to minYear by -1 // create sequence of years 
> from max to min
> // Define functions to build, for each client, a Map of year -> value for 
> year, and how those maps will be merged
> def createCombiner(obj: (Int, Int)): Map[Int, String] = Map(obj._1 -> 
> obj._2.toString)
> def mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + (obj._1 -> 
> obj._2.toString)
> def mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = 
> accum1 |+| accum2 // I’m lazy so I use Scalaz to merge two maps of year -> 
> value, I assume we don’t have two lines with same client and year…
> // For each client, check for each year from maxYear to minYear if it exists 
> in the computed map. If not input blank.
> val result = rdd
>   .map { case obj => (obj._1, (obj._2, obj._3)) }
>   .combineByKey(createCombiner, mergeValue, mergeCombiners)
>   .map{ case (name, mapOfYearsToValues) => (Seq(name) ++ 
> => mapOfYearsToValues.getOrElse(year, " 
> "))).mkString(",")} // here we assume that sequence of all years isn’t too 
> big to not fit in memory. If you had to compute for each day, it may break 
> and you would definitely need to use a specialized timeseries library…
> result.foreach(println)
> sc.stop()
> On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss <> wrote:
> Hi,
> I have data as follows:
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
> I need to convert the data to a new format:
> A ,    4,    12,    1
> B,   24,        ,    4
> Any idea how to make it in Spark Scala?
> Thanks
