fqaiser94 commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489466321
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##########
@@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] {
}
}
+ (rightProjectList, aliased)
+ }
Review comment:
I'm not sure this will scale well. Here's a currently failing test for
me:
```
def nestedDf(depth: Int, numColsAtEachDepth: Int): DataFrame = {
val initialNestedStructType = StructType(
(0 to numColsAtEachDepth).map(i =>
StructField(s"nested${depth}Col$i", IntegerType, nullable = false))
)
val initialNestedValues = Row(0 to numColsAtEachDepth: _*)
var depthCounter = depth - 1
var structType = initialNestedStructType
var struct = initialNestedValues
while (depthCounter != 0) {
struct = Row((struct +: (1 to numColsAtEachDepth)): _*)
structType = StructType(
StructField(s"nested${depthCounter}Col0", structType, nullable =
false) +:
(1 to numColsAtEachDepth).map(i =>
StructField(s"nested${depthCounter}Col$i", IntegerType, nullable
= false))
)
depthCounter -= 1
}
val df: DataFrame = spark.createDataFrame(
sparkContext.parallelize(Row(struct) :: Nil),
StructType(Seq(StructField("nested0Col0", structType))))
df
}
test("check performance with lots of nested columns at different depths") {
withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") {
val df1: DataFrame = nestedDf(depth = 10, numColsAtEachDepth = 1)
val df2: DataFrame = nestedDf(depth = 10, numColsAtEachDepth = 20)
val res = df1.unionByName(df2, allowMissingColumns = true)
res.explain(true)
}
}
```
fails with this exception after a long time:
```
[info] org.apache.spark.sql.DataFrameSetOperationsSuite *** ABORTED *** (3
minutes, 35 seconds)
[info] java.lang.OutOfMemoryError: GC overhead limit exceeded
```
Not sure but I think the plans being generated might be too big. The
generated physical plans don't look particularly well optimized to me for
simpler scenarios:
```
test("check physical plan for simple scenario") {
withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") {
val df1: DataFrame = nestedDf(depth = 2, numColsAtEachDepth = 1)
val df2: DataFrame = nestedDf(depth = 2, numColsAtEachDepth = 2)
val res = df1.unionByName(df2, allowMissingColumns = true)
res.explain(true)
}
}
```
gives me this plan:
```
== Parsed Logical Plan ==
'Union true, true
:- LogicalRDD [nested0Col0#1], false
+- LogicalRDD [nested0Col0#4], false
== Analyzed Logical Plan ==
nested0Col0:
struct<nested1Col0:struct<nested2Col0:int,nested2Col1:int,nested2Col2:int>,nested1Col1:int,nested1Col2:int>
Union false, false
:- Project [if (isnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2,
null), nested1Col1, knownnotnull(nested0Col0#1).nested1Col1))) null else
named_struct(nested1Col0, knownnotnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2,
null), nested1Col1, knownnotnull(nested0Col0#1).nested1Col1)).nested1Col0,
nested1Col1, knownnotnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(ne
sted0Col0#1.nested1Col0).nested2Col1, nested2Col2, null), nested1Col1,
knownnotnull(nested0Col0#1).nested1Col1)).nested1Col1, nested1Col2, null) AS
nested0Col0#7]
: +- LogicalRDD [nested0Col0#1], false
+- Project [if (isnull(nested0Col0#4)) null else named_struct(nested1Col0,
knownnotnull(nested0Col0#4).nested1Col0, nested1Col1,
knownnotnull(nested0Col0#4).nested1Col1, nested1Col2,
knownnotnull(nested0Col0#4).nested1Col2) AS nested0Col0#6]
+- LogicalRDD [nested0Col0#4], false
== Optimized Logical Plan ==
Union false, false
:- Project [if (isnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2,
null), nested1Col1, knownnotnull(nested0Col0#1).nested1Col1))) null else
named_struct(nested1Col0, knownnotnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2,
null), nested1Col1, knownnotnull(nested0Col0#1).nested1Col1)).nested1Col0,
nested1Col1, knownnotnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(ne
sted0Col0#1.nested1Col0).nested2Col1, nested2Col2, null), nested1Col1,
knownnotnull(nested0Col0#1).nested1Col1)).nested1Col1, nested1Col2, null) AS
nested0Col0#7]
: +- LogicalRDD [nested0Col0#1], false
+- Project [if (isnull(nested0Col0#4)) null else named_struct(nested1Col0,
knownnotnull(nested0Col0#4).nested1Col0, nested1Col1,
knownnotnull(nested0Col0#4).nested1Col1, nested1Col2,
knownnotnull(nested0Col0#4).nested1Col2) AS nested0Col0#6]
+- LogicalRDD [nested0Col0#4], false
== Physical Plan ==
Union
:- *(1) Project [if (isnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2,
null), nested1Col1, knownnotnull(nested0Col0#1).nested1Col1))) null else
named_struct(nested1Col0, knownnotnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnull(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2,
null), nested1Col1, knownnotnull(nested0Col0#1).nested1Col1)).nested1Col0,
nested1Col1, knownnotnull(if (isnull(nested0Col0#1)) null else
named_struct(nested1Col0, if (isnull(nested0Col0#1.nested1Col0)) null else
named_struct(nested2Col0, knownnotnull(nested0Col0#1.nested1Col0).nested2Col0,
nested2Col1, knownnotnu
ll(nested0Col0#1.nested1Col0).nested2Col1, nested2Col2, null), nested1Col1,
knownnotnull(nested0Col0#1).nested1Col1)).nested1Col1, nested1Col2, null) AS
nested0Col0#7]
: +- *(1) Scan ExistingRDD[nested0Col0#1]
+- *(2) Project [if (isnull(nested0Col0#4)) null else
named_struct(nested1Col0, knownnotnull(nested0Col0#4).nested1Col0, nested1Col1,
knownnotnull(nested0Col0#4).nested1Col1, nested1Col2,
knownnotnull(nested0Col0#4).nested1Col2) AS nested0Col0#6]
+- *(2) Scan ExistingRDD[nested0Col0#4]
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]