viirya opened a new pull request #24637: [SPARK-27707][SQL] Prune unnecessary 
nested fields from Generate
URL: https://github.com/apache/spark/pull/24637
 
 
   ## What changes were proposed in this pull request?
   
   Performance issue using explode was found when a complex field contains huge 
array is to get duplicated as the number of exploded array elements. Given 
example:
   
   ```scala
   val df = spark.sparkContext.parallelize(Seq(("1",
     Array.fill(M)({
       val i = math.random
       (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
     })))).toDF("col", "arr")
     .selectExpr("col", "struct(col, arr) as st")
     .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
   ```
   
   The explode causes `st` to be duplicated as many as the exploded elements.
   
   Benchmarks it:
   
   ```
   [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
   [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
   [info] generate big nested struct array:         Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] 
------------------------------------------------------------------------------------------------------------------------
   [info] generate big nested struct array wholestage off          52668        
  53162         699          0.0      877803.4       1.0X
   [info] generate big nested struct array wholestage on          47261         
 49093        1125          0.0      787690.2       1.1X
   [info]
   ```
   
   The query plan:
   ```
   == Physical Plan ==
    Project [col#508, st#512.col AS col1#515, arr_col#519]
    +- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519]
       +- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS 
st#512]
          +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,               
                      knownnotnull(assertnotnull(input[0, scala.Tuple2, 
true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, 
MapObjects_loopIsNull84,      ObjectType(class scala.Tuple4), if 
(isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, 
ObjectType(class scala.Tuple4), true)))     null else named_struct(_1, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString,                                        
knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, 
ObjectType(class scala.Tuple4), true))._1, true, false), _2,              
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(lambdavariable(MapObjects_loopValue84,               
MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, 
false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,     
StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, 
MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true,  
false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84,   
MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, 
false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2,       
None) AS _2#504]
             +- Scan[obj#534]
   ```
   
   This patch takes nested column pruning approach to prune unnecessary nested 
fields. It adds a projection of the needed nested fields as aliases on the 
child of `Generate`, and substitutes them by alias attributes on the projection 
on top of `Generate`.
   
   Benchmarks it after the change:
   ```
    [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
    [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
    [info] generate big nested struct array:         Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] 
------------------------------------------------------------------------------------------------------------------------
    [info] generate big nested struct array wholestage off            311       
     331          28          0.2        5188.6       1.0X
    [info] generate big nested struct array wholestage on            297        
    312          15          0.2        4947.3       1.0X
    [info]
   ```
   
   The query plan:
   ```
   == Physical Plan ==
    Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603]
    +- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, 
[arr_col#603]
       +- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS 
st#596, _1#587 AS _gen_alias_608#608]
          +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
knownnotnull(assertnotnull(in
    put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, 
mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class 
scala.Tuple4),
    if (isnull(lambdavariable(MapObjects_loopValue102, 
MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1,        staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
knownnotnull(lambdavariable(MapObjects_loopValue102,              
MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, 
false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,    
StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, 
MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2,      true, 
false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString,                                                 
knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, 
ObjectType(class scala.Tuple4), true))._3, true, false), _4,            
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(lambdavariable(MapObjects_loopValue102,              
MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, 
false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2,      
None) AS _2#588]
             +- Scan[obj#586]
   ```
   
   ## How was this patch tested?
   
   Added benchmark.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to