Spark SQL always uses a custom configuration of Kryo under the hood to improve shuffle performance: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
Michael On Sun, Sep 21, 2014 at 9:04 AM, Grega Kešpret <gr...@celtra.com> wrote: > Hi, > > I am seeing different shuffle write sizes when using SchemaRDD (versus > normal RDD). I'm doing the following: > > case class DomainObj(a: String, b: String, c: String, d: String) > > val logs: RDD[String] = sc.textFile(...) > val filtered: RDD[String] = logs.filter(...) > val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...) > > ------------------------------------------------------------ > 1. Operations on RDD: > ------------------------------------------------------------ > val results = requests > .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue") > .mapPartitions(objs => objs.map(obj => (obj, 1))) > .reduceByKey(_ + _, 200) > .collect() > > ------------------------------------------------------------ > 2. Operations on SchemaRDD: > ------------------------------------------------------------ > myDomainObjects.registerTempTable("myDomainObjects") > > val results = sqlContext.sql(""" > SELECT > a, b, c, d, COUNT(*) total > FROM > myDomainObjects > WHERE > a IN ('SomeValue', 'SomeOtherValue') > GROUP BY > a, b, c, d > """).collect() > > In the first case (RDD), the query returns in 2 minutes and 30 seconds > with the input size 28.4GB, and shuffle write size 525.3MB and shuffle read > size 472.5MB. > > In the second case (SchemaRDD), the query returns in 2 minutes and 9 > seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read > 233.0MB. > > Since in the second case, the shuffle size is half of the first case, I'd > like to understand why. > > Thanks, > Grega >