xkrogen commented on a change in pull request #34025:
URL: https://github.com/apache/spark/pull/34025#discussion_r710439691
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##########
@@ -559,6 +559,42 @@ object StructType extends AbstractDataType {
case _ => dt
}
+ /**
+ * This works a little similarly to `merge`, but it does not actually merge
two DataTypes.
+ * This method just merges nullability.
+ */
+ private[sql] def mergeNullability(left: DataType, right: DataType): DataType
=
+ (left, right) match {
+ case (ArrayType(leftElementType, leftContainsNull),
+ ArrayType(rightElementType, rightContainsNull)) =>
+ ArrayType(
+ merge(leftElementType, rightElementType),
Review comment:
doesn't this need to be `mergeNullability` ?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##########
@@ -559,6 +559,42 @@ object StructType extends AbstractDataType {
case _ => dt
}
+ /**
+ * This works a little similarly to `merge`, but it does not actually merge
two DataTypes.
+ * This method just merges nullability.
+ */
+ private[sql] def mergeNullability(left: DataType, right: DataType): DataType
=
+ (left, right) match {
+ case (ArrayType(leftElementType, leftContainsNull),
+ ArrayType(rightElementType, rightContainsNull)) =>
+ ArrayType(
+ merge(leftElementType, rightElementType),
+ leftContainsNull || rightContainsNull)
+
+ case (MapType(leftKeyType, leftValueType, leftContainsNull),
+ MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+ MapType(
+ merge(leftKeyType, rightKeyType),
+ merge(leftValueType, rightValueType),
+ leftContainsNull || rightContainsNull)
+
+ case (StructType(leftFields), StructType(rightFields)) =>
+ require(leftFields.size == rightFields.size, "To merge nullability," +
Review comment:
minor typo nit: missing a space after the comma in the message
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##########
@@ -559,6 +559,42 @@ object StructType extends AbstractDataType {
case _ => dt
}
+ /**
+ * This works a little similarly to `merge`, but it does not actually merge
two DataTypes.
+ * This method just merges nullability.
+ */
+ private[sql] def mergeNullability(left: DataType, right: DataType): DataType
=
+ (left, right) match {
+ case (ArrayType(leftElementType, leftContainsNull),
+ ArrayType(rightElementType, rightContainsNull)) =>
+ ArrayType(
+ merge(leftElementType, rightElementType),
+ leftContainsNull || rightContainsNull)
+
+ case (MapType(leftKeyType, leftValueType, leftContainsNull),
+ MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+ MapType(
+ merge(leftKeyType, rightKeyType),
+ merge(leftValueType, rightValueType),
+ leftContainsNull || rightContainsNull)
+
+ case (StructType(leftFields), StructType(rightFields)) =>
+ require(leftFields.size == rightFields.size, "To merge nullability," +
+ "two structs must have same number of fields.")
+
+ val newFields = leftFields.zip(rightFields).map {
+ case (leftField@StructField(_, leftType, leftNullable, _),
+ _@StructField(_, rightType, rightNullable, _)) =>
+ leftField.copy(
+ dataType = mergeNullability(leftType, rightType),
+ nullable = leftNullable || rightNullable)
+ }.toSeq
+ StructType(newFields)
+
+ case (leftType, _) =>
+ leftType
Review comment:
Previously `merge` would do type compatibility checking:
https://github.com/apache/spark/blob/924b4130244ec73df200dc6f3b9509dd8cafba71/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala#L663-L667
Do we need to restore this logic, or does `union` handle doing type
compatibility checking elsewhere?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##########
@@ -559,6 +559,42 @@ object StructType extends AbstractDataType {
case _ => dt
}
+ /**
+ * This works a little similarly to `merge`, but it does not actually merge
two DataTypes.
+ * This method just merges nullability.
+ */
+ private[sql] def mergeNullability(left: DataType, right: DataType): DataType
=
+ (left, right) match {
+ case (ArrayType(leftElementType, leftContainsNull),
+ ArrayType(rightElementType, rightContainsNull)) =>
+ ArrayType(
+ merge(leftElementType, rightElementType),
+ leftContainsNull || rightContainsNull)
+
+ case (MapType(leftKeyType, leftValueType, leftContainsNull),
+ MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+ MapType(
+ merge(leftKeyType, rightKeyType),
+ merge(leftValueType, rightValueType),
Review comment:
doesn't this need to be `mergeNullability` ?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##########
@@ -1018,6 +1018,23 @@ class DataFrameSetOperationsSuite extends QueryTest with
SharedSparkSession {
unionDF = df1.unionByName(df2)
checkAnswer(unionDF, expected)
}
+
+ test("SPARK-36673: Incorrect Unions of struct") {
+ val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS
INNER")))
+ val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS
inner")))
+
+ val df = df1.union(df2)
Review comment:
does `unionByName` work properly in this situation with your new changes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]