Spark SQL always uses a custom configuration of Kryo under the hood to
improve shuffle performance:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala

Michael

On Sun, Sep 21, 2014 at 9:04 AM, Grega Kešpret <gr...@celtra.com> wrote:

> Hi,
>
> I am seeing different shuffle write sizes when using SchemaRDD (versus
> normal RDD). I'm doing the following:
>
> case class DomainObj(a: String, b: String, c: String, d: String)
>
> val logs: RDD[String] = sc.textFile(...)
> val filtered: RDD[String] = logs.filter(...)
> val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)
>
> ------------------------------------------------------------
> 1. Operations on RDD:
> ------------------------------------------------------------
> val results = requests
>     .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
>     .mapPartitions(objs => objs.map(obj => (obj, 1)))
>     .reduceByKey(_ + _, 200)
>     .collect()
>
> ------------------------------------------------------------
> 2. Operations on SchemaRDD:
> ------------------------------------------------------------
> myDomainObjects.registerTempTable("myDomainObjects")
>
> val results = sqlContext.sql("""
>     SELECT
>         a, b, c, d, COUNT(*) total
>     FROM
>         myDomainObjects
>     WHERE
>         a IN ('SomeValue', 'SomeOtherValue')
>     GROUP BY
>         a, b, c, d
> """).collect()
>
> In the first case (RDD), the query returns in 2 minutes and 30 seconds
> with the input size 28.4GB, and shuffle write size 525.3MB and shuffle read
> size 472.5MB.
>
> In the second case (SchemaRDD), the query returns in 2 minutes and 9
> seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
> 233.0MB.
>
> Since in the second case, the shuffle size is half of the first case, I'd
> like to understand why.
>
> Thanks,
> Grega
>

Reply via email to