From: Nathan 
Date: Wednesday, 7 January 2015 2:53 pm
To: "<>" 
Subject: SparkSQL schemaRDD & MapPartitions calls - performance issues - 
columnar formats?


I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)

Now lets do some operations on it; I want the total sales & quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by 

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
    val qtySum = new Array[Double](24)
    val salesSum = new Array[Double](24)

    for(r <- hrs) {
      val hr = r.getInt(0)
      qtySum(hr) += r.getDouble(1)
      salesSum(hr) += r.getDouble(2)
    (salesSum zip qtySum)
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =>
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r <- hrs) {
    val hr = r.hour
    qtySum(hr) += r.qty
    salesSum(hr) += r.sales
  (salesSum zip qtySum)
}.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.

My thinking is that because SparkSQL does store things in a columnar format, 
there is some unwrapping to be done out of the column array buffers which takes 
time and for some reason this just takes longer when I switch out to map 
partitions (maybe its unwrapping the entire row, even though I’m using just a 
subset of columns, or maybe there is some object creation/autoboxing going on 
when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is 
fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster 
(2.6 seconds). But MapPartitions on the SchemaRDD;

sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => 
Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum

 takes a long time (33 seconds). In all these examples everything is fully 
cached in memory. And yes for these kinds of operations I can use SQL, but for 
more complex queries I’d much rather be using a combo of SparkSQL to select the 
data (so I get nice things like Parquet pushdowns etc.) & functional Scala!

I think I’m doing something dumb… Is there something I should be doing to get 
faster performance on MapPartitions on SchemaRDDs? Is there some unwrapping 
going on in the background that catalyst does in a smart way that I’m missing?


