fqaiser94 commented on pull request #27066:
URL: https://github.com/apache/spark/pull/27066#issuecomment-645047815
@cloud-fan I've made fairly significant changes to the implementation in
order to address the issue where poorly optimized physical plans resulted in
massive amounts of java code being generated.
Firstly, I've changed `WithFields` Expression so it only supports
adding/replacing top level fields in any given struct. To add/replace a nested
field using this expression, you will need to use `WithFields` in a nested
fashion (similar to what is being done in the `withFieldHelper` function). This
change is necessary to facilitate easy optimization.
I've added 2 new optimization rules:
- In `SimplifyExtractValueOps` I've added another case statement to simplify
`GetStructField` Expressions that are operating on `WithFields` Expressions
- I've added a new rule `CombineWithFields` to collapse adjacent
`WithFields` Expression into a single `WithFields` Expression
These 2 rules are executed until we reach a fixed point. Only after that do
we execute the third rule to transform `WithFields` Expressions to
`CreateNamedStruct` Expressions. This appears to be enough to remove all
unnecessary nested if-null-else statements.
Below is a ridiculous example of a spark-sql query where we're
adding/replacing different fields at different levels of nesting in a struct
column. You can see how these optimizer rules are able to simplify the
following super long logical plan into a reasonable physical plan:
```
val structLevel3 = StructType(Seq(
StructField("a3", IntegerType, nullable = false),
StructField("b3", IntegerType, nullable = false)))
val structLevel2 = StructType(Seq(
StructField("a2", structLevel3, nullable = true),
StructField("b2", structLevel3, nullable = true)))
val structLevel1 = StructType(Seq(
StructField("a1", structLevel2, nullable = true),
StructField("b1", structLevel2, nullable = true)))
val df = spark.createDataFrame(
sparkContext.parallelize(Row(Row(Row(Row(1, 2), Row(3, 4)), Row(Row(3, 4),
Row(5, 6)))) ::
Row(Row(Row(null, Row(3, 4)), Row(Row(3, 4), Row(5, 6)))) ::
Row(Row(null, Row(Row(3, 4), Row(5, 6)))) ::
Row(Row(null, null)) :: Nil),
StructType(Seq(StructField("a", structLevel1, nullable = false)))
)
val result = df.withColumn("a", $"a"
.withField("a1", $"a.a1".withField("c1", lit("hello")))
.withField("b1.a2", lit(1000))
.withField("b1.b2.d3", $"a.b1.b2.a3" * 10)
.withField("b1Original", $"a.b1")
)
result.explain(true)
// == Parsed Logical Plan ==
// 'Project [with_fields(with_fields(with_fields(with_fields('a, a1,
with_fields('a.a1, c1, hello)), b1, with_fields(with_fields('a, a1,
with_fields('a.a1, c1, hello))[b1], a2, 1000)), b1,
with_fields(with_fields(with_fields('a, a1, with_fields('a.a1, c1, hello)), b1,
with_fields(with_fields('a, a1, with_fields('a.a1, c1, hello))[b1], a2,
1000))[b1], b2, with_fields(with_fields(with_fields('a, a1, with_fields('a.a1,
c1, hello)), b1, with_fields(with_fields('a, a1, with_fields('a.a1, c1,
hello))[b1], a2, 1000))[b1][b2], d3, ('a.b1.b2.a3 * 10)))), b1Original, 'a.b1)
AS a#14]
// +- LogicalRDD [a#12], false
//
// == Analyzed Logical Plan ==
// a:
struct<a1:struct<a2:struct<a3:int,b3:int>,b2:struct<a3:int,b3:int>,c1:string>,b1:struct<a2:int,b2:struct<a3:int,b3:int,d3:int>>,b1Original:struct<a2:struct<a3:int,b3:int>,b2:struct<a3:int,b3:int>>>
// Project [with_fields(with_fields(with_fields(with_fields(a#12, a1,
with_fields(a#12.a1, c1, hello)), b1, with_fields(with_fields(a#12, a1,
with_fields(a#12.a1, c1, hello)).b1, a2, 1000)), b1,
with_fields(with_fields(with_fields(a#12, a1, with_fields(a#12.a1, c1, hello)),
b1, with_fields(with_fields(a#12, a1, with_fields(a#12.a1, c1, hello)).b1, a2,
1000)).b1, b2, with_fields(with_fields(with_fields(a#12, a1,
with_fields(a#12.a1, c1, hello)), b1, with_fields(with_fields(a#12, a1,
with_fields(a#12.a1, c1, hello)).b1, a2, 1000)).b1.b2, d3, (a#12.b1.b2.a3 *
10)))), b1Original, a#12.b1) AS a#14]
// +- LogicalRDD [a#12], false
//
// == Optimized Logical Plan ==
// Project [named_struct(a1, if (isnull(a#12.a1)) null else named_struct(a2,
a#12.a1.a2, b2, a#12.a1.b2, c1, hello), b1, if (isnull(a#12.b1)) null else
named_struct(a2, 1000, b2, if (isnull(a#12.b1.b2)) null else named_struct(a3,
a#12.b1.b2.a3, b3, a#12.b1.b2.b3, d3, (a#12.b1.b2.a3 * 10))), b1Original,
a#12.b1) AS a#14]
// +- LogicalRDD [a#12], false
//
// == Physical Plan ==
// *(1) Project [named_struct(a1, if (isnull(a#12.a1)) null else
named_struct(a2, a#12.a1.a2, b2, a#12.a1.b2, c1, hello), b1, if
(isnull(a#12.b1)) null else named_struct(a2, 1000, b2, if (isnull(a#12.b1.b2))
null else named_struct(a3, a#12.b1.b2.a3, b3, a#12.b1.b2.b3, d3, (a#12.b1.b2.a3
* 10))), b1Original, a#12.b1) AS a#14]
// +- *(1) Scan ExistingRDD[a#12]
result.show(false)
// +---------------------------------------------------------------+
// |a |
// +---------------------------------------------------------------+
// |[[[1, 2], [3, 4], hello], [1000, [5, 6, 50]], [[3, 4], [5, 6]]]|
// |[[, [3, 4], hello], [1000, [5, 6, 50]], [[3, 4], [5, 6]]] |
// |[, [1000, [5, 6, 50]], [[3, 4], [5, 6]]] |
// |[,,] |
// +---------------------------------------------------------------+
```
So I think with this approach we're able to meet all of the requirements
that have been suggested so far:
- Allow changing nested fields using the same `Column.withField(fieldName:
String, col: Column)` API signature
- Return null when given null struct argument
- Internally reuse CreateNamedStruct so that existing optimizations (like
predicate pushdown and schema pruning) can continue to work
- Return optimized physical plans similar to what an experienced spark-sql
user would normally "hand-write" using CreateNamedStruct
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]