viirya opened a new pull request #24637: [SPARK-27707][SQL] Prune unnecessary nested fields from Generate URL: https://github.com/apache/spark/pull/24637 ## What changes were proposed in this pull request? Performance issue using explode was found when a complex field contains huge array is to get duplicated as the number of exploded array elements. Given example: ```scala val df = spark.sparkContext.parallelize(Seq(("1", Array.fill(M)({ val i = math.random (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) })))).toDF("col", "arr") .selectExpr("col", "struct(col, arr) as st") .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col") ``` The explode causes `st` to be duplicated as many as the exploded elements. Benchmarks it: ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz [info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] generate big nested struct array wholestage off 52668 53162 699 0.0 877803.4 1.0X [info] generate big nested struct array wholestage on 47261 49093 1125 0.0 787690.2 1.1X [info] ``` The query plan: ``` == Physical Plan == Project [col#508, st#512.col AS col1#515, arr_col#519] +- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519] +- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS st#512] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#504] +- Scan[obj#534] ``` This patch takes nested column pruning approach to prune unnecessary nested fields. It adds a projection of the needed nested fields as aliases on the child of `Generate`, and substitutes them by alias attributes on the projection on top of `Generate`. Benchmarks it after the change: ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz [info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] generate big nested struct array wholestage off 311 331 28 0.2 5188.6 1.0X [info] generate big nested struct array wholestage on 297 312 15 0.2 4947.3 1.0X [info] ``` The query plan: ``` == Physical Plan == Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603] +- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, [arr_col#603] +- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS st#596, _1#587 AS _gen_alias_608#608] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(in put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#588] +- Scan[obj#586] ``` ## How was this patch tested? Added benchmark.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
