[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r506578657 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ## @@ -2067,6 +2067,13 @@ class Dataset[T] private[sql]( * // +++++ * }}} * + * Note that `allowMissingColumns` supports nested column in struct types, if the config + * `spark.sql.unionByName.structSupport.enabled` is enabled. Missing nested columns Review comment: good catch. updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r506578485 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,188 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns recursively in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `UpdateFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case u: UpdateFields if u.resolved => + u.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`UpdateFields` has incorrect eval expression: $other. " + Review comment: got it. fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r505677527 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,190 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `UpdateFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case u: UpdateFields if u.resolved => + u.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`UpdateFields` has incorrect eval expression: $other. " + +"Please file a bug report with this error message, stack trace, and the query.") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) => +UpdateFields(struct, fieldOps1 ++ fieldOps2) +} + } + + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * This is called by `compareAndAddFields` when we find two struct columns with same name but + * different nested fields. This method will find out the missing nested fields from `col` to + * `target` struct and add these missing nested fields. Currently we don't support finding out + * missing nested fields of struct nested in array or struct nested in map. + */ + private def addFields(col: NamedExpression, target: StructType): Expression = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) + +// We need to sort columns in result, because we might add another column in other side. +// E.g., we want to union two structs "a int, b long" and "a int, c string". +// If we don't sort, we will have "a int, b long, c string" and +// "a int, c string, b long", which are not compatible. +if (missingFields.isEmpty) { + sortStructFields(col) +} else { + missingFields.map { s => +val struct = addFieldsInto(col, s.fields) +// Combines `WithFields`s to reduce expression tree. +val reducedStruct = simplifyWithFields(struct) +val sorted = sortStructFieldsInWithFields(reducedStruct) +sorted + }.get +} + } + + /** + * Adds missing fields
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r504936830 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,190 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `UpdateFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case u: UpdateFields if u.resolved => + u.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`UpdateFields` has incorrect eval expression: $other. " + +"Please file a bug report with this error message, stack trace, and the query.") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) => Review comment: Actually I plan to move this optimization out of `ResolveUnion` into a separate rule in analyzer in #29812. For complex deeply nested schema, it is easier to write inefficient expression tree that is very slow in analysis phase. For the test case in this PR, it is unable to evaluate the query at all, but after adding this optimization, it can normally evaluate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r504799643 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,190 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `UpdateFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case u: UpdateFields if u.resolved => + u.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`UpdateFields` has incorrect eval expression: $other. " + +"Please file a bug report with this error message, stack trace, and the query.") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) => +UpdateFields(struct, fieldOps1 ++ fieldOps2) +} + } + + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * This is called by `compareAndAddFields` when we find two struct columns with same name but + * different nested fields. This method will find out the missing nested fields from `col` to + * `target` struct and add these missing nested fields. Currently we don't support finding out + * missing nested fields of struct nested in array or struct nested in map. + */ + private def addFields(col: NamedExpression, target: StructType): Expression = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) + +// We need to sort columns in result, because we might add another column in other side. +// E.g., we want to union two structs "a int, b long" and "a int, c string". +// If we don't sort, we will have "a int, b long, c string" and +// "a int, c string, b long", which are not compatible. +if (missingFields.isEmpty) { + sortStructFields(col) +} else { + missingFields.map { s => +val struct = addFieldsInto(col, s.fields) +// Combines `WithFields`s to reduce expression tree. +val reducedStruct = simplifyWithFields(struct) +val sorted = sortStructFieldsInWithFields(reducedStruct) +sorted + }.get +} + } + + /** + * Adds missing fields
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r504798080 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,190 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `UpdateFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case u: UpdateFields if u.resolved => + u.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`UpdateFields` has incorrect eval expression: $other. " + +"Please file a bug report with this error message, stack trace, and the query.") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) => +UpdateFields(struct, fieldOps1 ++ fieldOps2) +} + } + + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * This is called by `compareAndAddFields` when we find two struct columns with same name but + * different nested fields. This method will find out the missing nested fields from `col` to + * `target` struct and add these missing nested fields. Currently we don't support finding out + * missing nested fields of struct nested in array or struct nested in map. + */ + private def addFields(col: NamedExpression, target: StructType): Expression = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = Review comment: Good. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r504796446 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,190 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `UpdateFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case u: UpdateFields if u.resolved => + u.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`UpdateFields` has incorrect eval expression: $other. " + +"Please file a bug report with this error message, stack trace, and the query.") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) => Review comment: Yea. Without optimizing the expressions, we cannot scale up well for deeply nested schema, e.g. the added test `SPARK-32376: Make unionByName null-filling behavior work with struct columns - deep expr`. in `DataFrameSetOperationsSuite`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r504794611 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2740,6 +2740,19 @@ object SQLConf { .booleanConf .createWithDefault(false) + val UNION_BYNAME_STRUCT_SUPPORT_ENABLED = Review comment: Ok, sounds more correct. I will remove this config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r502837273 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2721,6 +2721,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val UNION_BYNAME_STRUCT_SUPPORT_ENABLED = +buildConf("spark.sql.unionByName.structSupport.enabled") + .doc("When true, the `allowMissingColumns` feature of `Dataset.unionByName` supports " + +"nested column in struct types. Missing nested columns of struct columns with same " + +"name will also be filled with null values. This currently does not support nested " + +"columns in array and map types.") Review comment: To have a consistent schema but not just sorting the fields, top-level case is much easier. We just iterate each top-level column of left side, and in the end putting the fields only existing in right side at the end of schema. Then it is done. For nested column case, filling missing nested columns is done by using `WithFields` which basically adds new columns in the end of original schema. To achieve same consistent schema like top-level column case, we either adjust all nested columns later or simply sort nested columns. I believe it is not unable to adjust the nested columns to have a more natural schema. But I can see this PR grows too big and I would like to cut it early. If you are okay, I can address this issue in other PR after this. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r502837273 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2721,6 +2721,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val UNION_BYNAME_STRUCT_SUPPORT_ENABLED = +buildConf("spark.sql.unionByName.structSupport.enabled") + .doc("When true, the `allowMissingColumns` feature of `Dataset.unionByName` supports " + +"nested column in struct types. Missing nested columns of struct columns with same " + +"name will also be filled with null values. This currently does not support nested " + +"columns in array and map types.") Review comment: To have a consistent schema but not just sorting the fields, top-level case is much easier. We just iterate each top-level column of left side, and in the end putting the fields only existing in right side at the end of schema. Then it is done. For nested column case, filling missing nested columns is done by using `WithFields` which basically adds new columns in the end of original schema. To achieve same consistent schema like top-level column case, we either adjust all nested columns later or simply sort nested columns. I believe it is not unable to adjust the nested columns to have a more natural schema. But I can see this PR grows too big and I would like to cut it early. If you are okay, I can address this issue in other PR after this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r500034164 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2721,6 +2721,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val UNION_BYNAME_STRUCT_SUPPORT_ENABLED = +buildConf("spark.sql.unionByName.structSupport.enabled") + .doc("When true, the `allowMissingColumns` feature of `Dataset.unionByName` supports " + +"nested column in struct types. Missing nested columns of struct columns with same " + +"name will also be filled with null values. This currently does not support nested " + +"columns in array and map types.") Review comment: Updated both the config doc and API doc in Dataset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r500033312 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => Review comment: Revised. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r499980671 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => Review comment: Okay, I will revise the error message here. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r499980569 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { Review comment: Yeah, actually there is #29812 for that, but is stuck by other PR that is refactoring `WithFields`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r499980265 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => +val newNames = mutable.ArrayBuffer.empty[String] +val newValues = mutable.ArrayBuffer.empty[Expression] +names.zip(values).reverse.foreach { case (name, value) => + if (!newNames.contains(name)) { +newNames += name +newValues += value + } +} +WithFields(structExpr, names = newNames.reverse, valExprs = newValues.reverse) + case WithFields(WithFields(struct, names1, valExprs1), names2, valExprs2) => +WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2) Review comment: This `simplifyWithFields` will be removed out to separate rule: #29812. But currently I leave them here to pass all added tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r499980265 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => +val newNames = mutable.ArrayBuffer.empty[String] +val newValues = mutable.ArrayBuffer.empty[Expression] +names.zip(values).reverse.foreach { case (name, value) => + if (!newNames.contains(name)) { +newNames += name +newValues += value + } +} +WithFields(structExpr, names = newNames.reverse, valExprs = newValues.reverse) + case WithFields(WithFields(struct, names1, valExprs1), names2, valExprs2) => +WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2) Review comment: This part will be removed out to separate rule: #29812. But currently I leave them here to pass all added tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r499979944 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2721,6 +2721,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val UNION_BYNAME_STRUCT_SUPPORT_ENABLED = +buildConf("spark.sql.unionByName.structSupport.enabled") + .doc("When true, the `allowMissingColumns` feature of `Dataset.unionByName` supports " + +"nested column in struct types. Missing nested columns of struct columns with same " + +"name will also be filled with null values. This currently does not support nested " + +"columns in array and map types.") Review comment: Oh, right, I will update the doc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r493101256 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => Review comment: When I was working on simplify expression tree for scale issue, I was running into this case. I changed other part of code later, so maybe this isn't hit now. Let me check. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1),
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r493103615 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => Review comment: Ok, seems not. However, this is still a useful to optimize inefficient `WithFields` expressions. I relied on these rules to simplify expression tree and debugging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r493101256 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => Review comment: When I was working on simplify expression tree for scale issue, I was running into this case. I changed other part of code later, so maybe this isn't hit now. Let me check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r491711425 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => +val newNames = mutable.ArrayBuffer.empty[String] +val newValues = mutable.ArrayBuffer.empty[Expression] +names.zip(values).reverse.foreach { case (name, value) => + if (!newNames.contains(name)) { +newNames += name +newValues += value + } +} +WithFields(structExpr, names = newNames.reverse, valExprs = newValues.reverse) + case WithFields(WithFields(struct, names1, valExprs1), names2, valExprs2) => +WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2) + case g @ GetStructField(WithFields(_, names, values), _, _) +if names.contains(g.extractFieldName) => +names.zip(values).reverse.filter(p => p._1 == g.extractFieldName).head._2 +} + } Review comment: #29812 29812 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r491711425 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => +val newNames = mutable.ArrayBuffer.empty[String] +val newValues = mutable.ArrayBuffer.empty[Expression] +names.zip(values).reverse.foreach { case (name, value) => + if (!newNames.contains(name)) { +newNames += name +newValues += value + } +} +WithFields(structExpr, names = newNames.reverse, valExprs = newValues.reverse) + case WithFields(WithFields(struct, names1, valExprs1), names2, valExprs2) => +WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2) + case g @ GetStructField(WithFields(_, names, values), _, _) +if names.contains(g.extractFieldName) => +names.zip(values).reverse.filter(p => p._1 == g.extractFieldName).head._2 +} + } Review comment: #29812 29812 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r491638577 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,202 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts recursively columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => +val fieldExpr = GetStructField(KnownNotNull(expr), i) +if (fieldExpr.dataType.isInstanceOf[StructType]) { + (name, sortStructFields(fieldExpr)) +} else { + (name, fieldExpr) +} +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + /** + * Assumes input expressions are field expression of `CreateNamedStruct`. This method + * sorts the expressions based on field names. + */ + private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair.head.isInstanceOf[Literal]) + pair.head.eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortFieldExprs(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + def simplifyWithFields(expr: Expression): Expression = { +expr.transformUp { + case WithFields(structExpr, names, values) if names.distinct.length != names.length => +val newNames = mutable.ArrayBuffer.empty[String] +val newValues = mutable.ArrayBuffer.empty[Expression] +names.zip(values).reverse.foreach { case (name, value) => + if (!newNames.contains(name)) { +newNames += name +newValues += value + } +} +WithFields(structExpr, names = newNames.reverse, valExprs = newValues.reverse) + case WithFields(WithFields(struct, names1, valExprs1), names2, valExprs2) => +WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2) + case g @ GetStructField(WithFields(_, names, values), _, _) +if names.contains(g.extractFieldName) => +names.zip(values).reverse.filter(p => p._1 == g.extractFieldName).head._2 +} + } Review comment: This is to optimize `WithFields` expression chain. I will put this into a separate PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r491637817 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) + +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => (name, GetStructField(KnownNotNull(expr), i).asInstanceOf[Expression]) +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair(0).isInstanceOf[Literal]) + pair(0).eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * This is called by `compareAndAddFields` when we find two struct columns with same name but + * different nested fields. This method will find out the missing nested fields from `col` to + * `target` struct and add these missing nested fields. Currently we don't support finding out + * missing nested fields of struct nested in array or struct nested in map. + */ + private def addFields(col: NamedExpression, target: StructType): Expression = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) + +// We need to sort columns in result, because we might add another column in other side. +// E.g., we want to union two structs "a int, b long" and "a int, c string". +// If we don't sort, we will have "a int, b long, c string" and +// "a int, c string, b long", which are not compatible. +if (missingFields.isEmpty) { + sortStructFields(col) Review comment: @maropu I got you point when I was fixing the performance issue. Yeah, we should. I fixed it in latest commit. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r491637762 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] { } } +(rightProjectList, aliased) + } Review comment: Improved in latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489799250 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) Review comment: yes, let me remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489799141 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) + +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => (name, GetStructField(KnownNotNull(expr), i).asInstanceOf[Expression]) Review comment: looks like redundant, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489542367 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) + +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => (name, GetStructField(KnownNotNull(expr), i).asInstanceOf[Expression]) +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair(0).isInstanceOf[Literal]) + pair(0).eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") + } + } + + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * This is called by `compareAndAddFields` when we find two struct columns with same name but + * different nested fields. This method will find out the missing nested fields from `col` to + * `target` struct and add these missing nested fields. Currently we don't support finding out + * missing nested fields of struct nested in array or struct nested in map. + */ + private def addFields(col: NamedExpression, target: StructType): Expression = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) + +// We need to sort columns in result, because we might add another column in other side. +// E.g., we want to union two structs "a int, b long" and "a int, c string". +// If we don't sort, we will have "a int, b long, c string" and +// "a int, c string, b long", which are not compatible. +if (missingFields.isEmpty) { + sortStructFields(col) Review comment: Oh, we need this to make sure two sides have consistent schema. For example the test case from @fqaiser94 in https://github.com/apache/spark/pull/29587#discussion_r488100532, when we add field to one side, another side still needs to sort its column, otherwise there is inconsistency. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489540174 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} Review comment: OK. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489540466 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { Review comment: sure, let me think about better method names. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489539877 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) + +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => (name, GetStructField(KnownNotNull(expr), i).asInstanceOf[Expression]) +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair(0).isInstanceOf[Literal]) + pair(0).eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other => + throw new AnalysisException(s"`WithFields` has incorrect eval expression: $other") Review comment: Yea, I think so. Or we change `WithFields` but forget to update here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489539121 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] { } } +(rightProjectList, aliased) + } Review comment: Let me think how we can improve it.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489537683 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] { } } +(rightProjectList, aliased) + } Review comment: Well, I currently extract the evaluation expr out from `WithFields`, otherwise it is more complicated...I think if you use `WithFields` to manually change such deep structure, there might be same scale issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r488324577 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -507,33 +507,156 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } test("SPARK-29358: Make unionByName optionally fill missing columns with nulls") { -var df1 = Seq(1, 2, 3).toDF("a") -var df2 = Seq(3, 1, 2).toDF("b") -val df3 = Seq(2, 3, 1).toDF("c") -val unionDf = df1.unionByName(df2.unionByName(df3, true), true) -checkAnswer(unionDf, - Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1 -Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2 -Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3 -) +Seq("true", "false").foreach { config => + withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> config) { +var df1 = Seq(1, 2, 3).toDF("a") +var df2 = Seq(3, 1, 2).toDF("b") +val df3 = Seq(2, 3, 1).toDF("c") +val unionDf = df1.unionByName(df2.unionByName(df3, true), true) +checkAnswer(unionDf, + Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1 +Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2 +Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3 +) + +df1 = Seq((1, 2)).toDF("a", "c") +df2 = Seq((3, 4, 5)).toDF("a", "b", "c") +checkAnswer(df1.unionByName(df2, true), + Row(1, 2, null) :: Row(3, 5, 4) :: Nil) +checkAnswer(df2.unionByName(df1, true), + Row(3, 4, 5) :: Row(1, null, 2) :: Nil) + +withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + df2 = Seq((3, 4, 5)).toDF("a", "B", "C") + val union1 = df1.unionByName(df2, true) + val union2 = df2.unionByName(df1, true) + + checkAnswer(union1, Row(1, 2, null, null) :: Row(3, null, 4, 5) :: Nil) + checkAnswer(union2, Row(3, 4, 5, null) :: Row(1, null, null, 2) :: Nil) + + assert(union1.schema.fieldNames === Array("a", "c", "B", "C")) + assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) +} + } +} + } -df1 = Seq((1, 2)).toDF("a", "c") -df2 = Seq((3, 4, 5)).toDF("a", "b", "c") -checkAnswer(df1.unionByName(df2, true), - Row(1, 2, null) :: Row(3, 5, 4) :: Nil) -checkAnswer(df2.unionByName(df1, true), - Row(3, 4, 5) :: Row(1, null, 2) :: Nil) + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - simple") { +withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") { + val df1 = Seq(((1, 2, 3), 0), ((2, 3, 4), 1), ((3, 4, 5), 2)).toDF("a", "idx") + val df2 = Seq(((3, 4), 0), ((1, 2), 1), ((2, 3), 2)).toDF("a", "idx") + val df3 = Seq(((100, 101, 102, 103), 0), ((110, 111, 112, 113), 1), ((120, 121, 122, 123), 2)) +.toDF("a", "idx") + + var unionDf = df1.unionByName(df2, true) + + checkAnswer(unionDf, +Row(Row(1, 2, 3), 0) :: Row(Row(2, 3, 4), 1) :: Row(Row(3, 4, 5), 2) :: + Row(Row(3, 4, null), 0) :: Row(Row(1, 2, null), 1) :: Row(Row(2, 3, null), 2) :: Nil + ) + + assert(unionDf.schema.toDDL == "`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT>,`idx` INT") + + unionDf = df1.unionByName(df2, true).unionByName(df3, true) + + checkAnswer(unionDf, +Row(Row(1, 2, 3, null), 0) :: + Row(Row(2, 3, 4, null), 1) :: + Row(Row(3, 4, 5, null), 2) :: // df1 + Row(Row(3, 4, null, null), 0) :: + Row(Row(1, 2, null, null), 1) :: + Row(Row(2, 3, null, null), 2) :: // df2 + Row(Row(100, 101, 102, 103), 0) :: + Row(Row(110, 111, 112, 113), 1) :: + Row(Row(120, 121, 122, 123), 2) :: Nil // df3 + ) + assert(unionDf.schema.toDDL == +"`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT, `_4`: INT>,`idx` INT") +} + } -withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - df2 = Seq((3, 4, 5)).toDF("a", "B", "C") - val union1 = df1.unionByName(df2, true) - val union2 = df2.unionByName(df1, true) + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - nested") { +withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") { + val df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2".toDF("id", "a") + val df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L.toDF("id", "a") + + val expectedSchema = "`id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, " + +"`nested`: STRUCT<`a`: INT, `b`: BIGINT, `c`: STRING>>" + + var unionDf = df1.unionByName(df2, true) + checkAnswer(unionDf, +Row(0, Row(0, 1, Row(1, null, "2"))) :: + Row(1, Row(1, 2,
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r488314435 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ## @@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E case class WithFields( structExpr: Expression, names: Seq[String], -valExprs: Seq[Expression]) extends Unevaluable { +valExprs: Seq[Expression], +sortOutputColumns: Boolean = false) extends Unevaluable { Review comment: In the latest commit, I moved sorting columns out from `WithFields`. I will fix the aforementioned failing case later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r488314435 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ## @@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E case class WithFields( structExpr: Expression, names: Seq[String], -valExprs: Seq[Expression]) extends Unevaluable { +valExprs: Seq[Expression], +sortOutputColumns: Boolean = false) extends Unevaluable { Review comment: In the latest commit, I moved sorting columns from `WithFields`. I will fix the aforementioned failing case later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r488314435 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ## @@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E case class WithFields( structExpr: Expression, names: Seq[String], -valExprs: Seq[Expression]) extends Unevaluable { +valExprs: Seq[Expression], +sortOutputColumns: Boolean = false) extends Unevaluable { Review comment: In the latest commit, I moved sorting columns from `WithFields`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r488155092 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ## @@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E case class WithFields( structExpr: Expression, names: Seq[String], -valExprs: Seq[Expression]) extends Unevaluable { +valExprs: Seq[Expression], +sortOutputColumns: Boolean = false) extends Unevaluable { Review comment: Sounds good to me. We probably don't need an individual expression for that. I think a private helper method should be enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r488109612 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -507,33 +507,156 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } test("SPARK-29358: Make unionByName optionally fill missing columns with nulls") { -var df1 = Seq(1, 2, 3).toDF("a") -var df2 = Seq(3, 1, 2).toDF("b") -val df3 = Seq(2, 3, 1).toDF("c") -val unionDf = df1.unionByName(df2.unionByName(df3, true), true) -checkAnswer(unionDf, - Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1 -Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2 -Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3 -) +Seq("true", "false").foreach { config => + withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> config) { +var df1 = Seq(1, 2, 3).toDF("a") +var df2 = Seq(3, 1, 2).toDF("b") +val df3 = Seq(2, 3, 1).toDF("c") +val unionDf = df1.unionByName(df2.unionByName(df3, true), true) +checkAnswer(unionDf, + Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1 +Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2 +Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3 +) + +df1 = Seq((1, 2)).toDF("a", "c") +df2 = Seq((3, 4, 5)).toDF("a", "b", "c") +checkAnswer(df1.unionByName(df2, true), + Row(1, 2, null) :: Row(3, 5, 4) :: Nil) +checkAnswer(df2.unionByName(df1, true), + Row(3, 4, 5) :: Row(1, null, 2) :: Nil) + +withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + df2 = Seq((3, 4, 5)).toDF("a", "B", "C") + val union1 = df1.unionByName(df2, true) + val union2 = df2.unionByName(df1, true) + + checkAnswer(union1, Row(1, 2, null, null) :: Row(3, null, 4, 5) :: Nil) + checkAnswer(union2, Row(3, 4, 5, null) :: Row(1, null, null, 2) :: Nil) + + assert(union1.schema.fieldNames === Array("a", "c", "B", "C")) + assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) +} + } +} + } -df1 = Seq((1, 2)).toDF("a", "c") -df2 = Seq((3, 4, 5)).toDF("a", "b", "c") -checkAnswer(df1.unionByName(df2, true), - Row(1, 2, null) :: Row(3, 5, 4) :: Nil) -checkAnswer(df2.unionByName(df1, true), - Row(3, 4, 5) :: Row(1, null, 2) :: Nil) + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - simple") { +withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") { + val df1 = Seq(((1, 2, 3), 0), ((2, 3, 4), 1), ((3, 4, 5), 2)).toDF("a", "idx") + val df2 = Seq(((3, 4), 0), ((1, 2), 1), ((2, 3), 2)).toDF("a", "idx") + val df3 = Seq(((100, 101, 102, 103), 0), ((110, 111, 112, 113), 1), ((120, 121, 122, 123), 2)) +.toDF("a", "idx") + + var unionDf = df1.unionByName(df2, true) + + checkAnswer(unionDf, +Row(Row(1, 2, 3), 0) :: Row(Row(2, 3, 4), 1) :: Row(Row(3, 4, 5), 2) :: + Row(Row(3, 4, null), 0) :: Row(Row(1, 2, null), 1) :: Row(Row(2, 3, null), 2) :: Nil + ) + + assert(unionDf.schema.toDDL == "`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT>,`idx` INT") + + unionDf = df1.unionByName(df2, true).unionByName(df3, true) + + checkAnswer(unionDf, +Row(Row(1, 2, 3, null), 0) :: + Row(Row(2, 3, 4, null), 1) :: + Row(Row(3, 4, 5, null), 2) :: // df1 + Row(Row(3, 4, null, null), 0) :: + Row(Row(1, 2, null, null), 1) :: + Row(Row(2, 3, null, null), 2) :: // df2 + Row(Row(100, 101, 102, 103), 0) :: + Row(Row(110, 111, 112, 113), 1) :: + Row(Row(120, 121, 122, 123), 2) :: Nil // df3 + ) + assert(unionDf.schema.toDDL == +"`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT, `_4`: INT>,`idx` INT") +} + } -withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - df2 = Seq((3, 4, 5)).toDF("a", "B", "C") - val union1 = df1.unionByName(df2, true) - val union2 = df2.unionByName(df1, true) + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - nested") { +withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") { + val df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2".toDF("id", "a") + val df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L.toDF("id", "a") + + val expectedSchema = "`id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, " + +"`nested`: STRUCT<`a`: INT, `b`: BIGINT, `c`: STRING>>" + + var unionDf = df1.unionByName(df2, true) + checkAnswer(unionDf, +Row(0, Row(0, 1, Row(1, null, "2"))) :: + Row(1, Row(1, 2,
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487662093 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,107 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortOutputColumns = true) + } +} + } + + private def compareAndAddFields( Review comment: Added. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,107 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. Review comment: Revised the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487645676 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,107 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), Review comment: Oh, I think it is bad for performance using `WithFields` like that. `WithFields` basically extracts fields from a struct and re-construct the struct with new field. So you can image that too many missing columns cannot get good performance. However, I think we currently have no better approach to fill missing (nested) fields in structs. We might add a note to the `unionByName` API for performance notice if you think it is necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487647961 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,107 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level Review comment: There are end-to-end tests for that. I will update this comment with the example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487645676 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,107 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), Review comment: Oh, I think it is bad for performance using `WithFields` like that. `WithFields` basically extracts fields from a struct and re-construct the struct with new field. So you can image that too many missing columns cannot get good performance. However, I think we currently have no better approach to fill missing (nested) fields in structs. We might add a note to the `unionByName` API for performance notice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487484664 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala ## @@ -103,4 +105,112 @@ class StructTypeSuite extends SparkFunSuite { val interval = "`a` INTERVAL" assert(fromDDL(interval).toDDL === interval) } + + test("find missing (nested) fields") { +val schema = StructType.fromDDL( + "c1 INT, c2 STRUCT>") +val resolver = SQLConf.get.resolver + +val source1 = StructType.fromDDL("c1 INT") +val missing1 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source1, schema, resolver) + .exists(_.sameType(missing1))) + +val source2 = StructType.fromDDL("c1 INT, c3 STRING") +val missing2 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source2, schema, resolver) + .exists(_.sameType(missing2))) + +val source3 = StructType.fromDDL("c1 INT, c2 STRUCT") +val missing3 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source3, schema, resolver) + .exists(_.sameType(missing3))) + +val source4 = StructType.fromDDL("c1 INT, c2 STRUCT>") +val missing4 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source4, schema, resolver) + .exists(_.sameType(missing4))) + +val schemaWithArray = StructType.fromDDL( + "c1 INT, c2 ARRAY>") +val source5 = StructType.fromDDL( + "c1 INT") +val missing5 = StructType.fromDDL( + "c2 ARRAY>") +assert( + StructType.findMissingFields(source5, schemaWithArray, resolver) +.exists(_.sameType(missing5))) + +val schemaWithMap1 = StructType.fromDDL( + "c1 INT, c2 MAP, STRING>, c3 LONG") +val source6 = StructType.fromDDL( + "c1 INT, c3 LONG") +val missing6 = StructType.fromDDL( + "c2 MAP, STRING>") +assert( + StructType.findMissingFields(source6, schemaWithMap1, resolver) +.exists(_.sameType(missing6))) + +val schemaWithMap2 = StructType.fromDDL( + "c1 INT, c2 MAP>, c3 STRING") +val source7 = StructType.fromDDL( + "c1 INT, c3 STRING") +val missing7 = StructType.fromDDL( + "c2 MAP>") +assert( + StructType.findMissingFields(source7, schemaWithMap2, resolver) +.exists(_.sameType(missing7))) + +// Unsupported: nested struct in array, map +val source8 = StructType.fromDDL( + "c1 INT, c2 ARRAY>") +// `findMissingFields` doesn't support looking into nested struct in array type. +assert(StructType.findMissingFields(source8, schemaWithArray, resolver).isEmpty) + +val source9 = StructType.fromDDL( + "c1 INT, c2 MAP, STRING>, c3 LONG") +// `findMissingFields` doesn't support looking into nested struct in map type. +assert(StructType.findMissingFields(source9, schemaWithMap1, resolver).isEmpty) + +val source10 = StructType.fromDDL( + "c1 INT, c2 MAP>, c3 STRING") +// `findMissingFields` doesn't support looking into nested struct in map type. +assert(StructType.findMissingFields(source10, schemaWithMap2, resolver).isEmpty) + } + + test("find missing (nested) fields: case sensitive cases") { +withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val schema = StructType.fromDDL( +"c1 INT, c2 STRUCT>") + val resolver = SQLConf.get.resolver + + val source1 = StructType.fromDDL("c1 INT, C2 LONG") + val missing1 = StructType.fromDDL( +"c2 STRUCT>") + assert(StructType.findMissingFields(source1, schema, resolver) +.exists(_.sameType(missing1))) + + val source2 = StructType.fromDDL("c2 LONG") + val missing2 = StructType.fromDDL( +"c1 INT") Review comment: Oh, sure. I missed that breaking lines. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487484292 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala ## @@ -103,4 +105,112 @@ class StructTypeSuite extends SparkFunSuite { val interval = "`a` INTERVAL" assert(fromDDL(interval).toDDL === interval) } + + test("find missing (nested) fields") { Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487484192 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala ## @@ -103,4 +105,112 @@ class StructTypeSuite extends SparkFunSuite { val interval = "`a` INTERVAL" assert(fromDDL(interval).toDDL === interval) } + + test("find missing (nested) fields") { +val schema = StructType.fromDDL( + "c1 INT, c2 STRUCT>") +val resolver = SQLConf.get.resolver + +val source1 = StructType.fromDDL("c1 INT") +val missing1 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source1, schema, resolver) + .exists(_.sameType(missing1))) + +val source2 = StructType.fromDDL("c1 INT, c3 STRING") +val missing2 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source2, schema, resolver) + .exists(_.sameType(missing2))) + +val source3 = StructType.fromDDL("c1 INT, c2 STRUCT") +val missing3 = StructType.fromDDL( + "c2 STRUCT>") +assert(StructType.findMissingFields(source3, schema, resolver) + .exists(_.sameType(missing3))) + +val source4 = StructType.fromDDL("c1 INT, c2 STRUCT>") +val missing4 = StructType.fromDDL( + "c2 STRUCT>") Review comment: Oops, sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487463931 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) } } + Review comment: Added. ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala ## @@ -103,4 +104,80 @@ class StructTypeSuite extends SparkFunSuite { val interval = "`a` INTERVAL" assert(fromDDL(interval).toDDL === interval) } + + test("find missing (nested) fields") { +val schema = StructType.fromDDL( + "c1 INT, c2 STRUCT>") +val resolver = SQLConf.get.resolver Review comment: yes, added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r487457983 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,101 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortOutputColumns = true) + } +} + } + + private def compareAndAddFields( left: LogicalPlan, right: LogicalPlan, - allowMissingCol: Boolean): LogicalPlan = { + allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { val resolver = SQLConf.get.resolver val leftOutputAttrs = left.output val rightOutputAttrs = right.output -// Builds a project list for `right` based on `left` output names +val aliased = mutable.ArrayBuffer.empty[Attribute] + val rightProjectList = leftOutputAttrs.map { lattr => - rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse { + val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) } + if (found.isDefined) { +val foundDt = found.get.dataType +(foundDt, lattr.dataType) match { + case (source: StructType, target: StructType) + if allowMissingCol && !source.sameType(target) => Review comment: I see. I will add more comments explaining this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r486761719 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) } } + + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - 1") { Review comment: sure. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,101 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortOutputColumns = true) + } +} + } + + private def compareAndAddFields( left: LogicalPlan, right: LogicalPlan, - allowMissingCol: Boolean): LogicalPlan = { + allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { val resolver = SQLConf.get.resolver val leftOutputAttrs = left.output val rightOutputAttrs = right.output -// Builds a project list for `right` based on `left` output names +val aliased = mutable.ArrayBuffer.empty[Attribute] + val rightProjectList = leftOutputAttrs.map { lattr => - rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse { + val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) } + if (found.isDefined) { +val foundDt = found.get.dataType +(foundDt, lattr.dataType) match { + case (source: StructType, target: StructType) + if allowMissingCol && !source.sameType(target) => Review comment: Hmm, I'm not sure where we can simplify the logic? By adding `canMergeSchemas`, doesn't it look more complicated? ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r486761719 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) } } + + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - 1") { Review comment: sure. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,101 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortOutputColumns = true) + } +} + } + + private def compareAndAddFields( left: LogicalPlan, right: LogicalPlan, - allowMissingCol: Boolean): LogicalPlan = { + allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { val resolver = SQLConf.get.resolver val leftOutputAttrs = left.output val rightOutputAttrs = right.output -// Builds a project list for `right` based on `left` output names +val aliased = mutable.ArrayBuffer.empty[Attribute] + val rightProjectList = leftOutputAttrs.map { lattr => - rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse { + val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) } + if (found.isDefined) { +val foundDt = found.get.dataType +(foundDt, lattr.dataType) match { + case (source: StructType, target: StructType) + if allowMissingCol && !source.sameType(target) => Review comment: Hmm, I'm not sure where we can simplify the logic? By adding `canMergeSchemas`, doesn't it look more complicated? ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r486761719 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) } } + + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - 1") { Review comment: sure. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,101 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortOutputColumns = true) + } +} + } + + private def compareAndAddFields( left: LogicalPlan, right: LogicalPlan, - allowMissingCol: Boolean): LogicalPlan = { + allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { val resolver = SQLConf.get.resolver val leftOutputAttrs = left.output val rightOutputAttrs = right.output -// Builds a project list for `right` based on `left` output names +val aliased = mutable.ArrayBuffer.empty[Attribute] + val rightProjectList = leftOutputAttrs.map { lattr => - rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse { + val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) } + if (found.isDefined) { +val foundDt = found.get.dataType +(foundDt, lattr.dataType) match { + case (source: StructType, target: StructType) + if allowMissingCol && !source.sameType(target) => Review comment: Hmm, I'm not sure where we can simplify the logic? By adding `canMergeSchemas`, doesn't it look more complicated? This is an automated message from the Apache Git Service. To
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r486762671 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,101 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.isEmpty) { + None +} else { + missingFields.map(s => addFieldsInto(col, "", s.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given + * in `fields`. For example, given `col` as "a struct, b int", and `fields` is + * "a struct, c string". This method will add a nested `a.c` field and a top-level + * `c` field to `col` and fill null values for them. + */ + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +fields.foldLeft(col) { case (currCol, field) => + field.dataType match { +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortOutputColumns = true) + } else { +addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } +case dt => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortOutputColumns = true) + } +} + } + + private def compareAndAddFields( left: LogicalPlan, right: LogicalPlan, - allowMissingCol: Boolean): LogicalPlan = { + allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { val resolver = SQLConf.get.resolver val leftOutputAttrs = left.output val rightOutputAttrs = right.output -// Builds a project list for `right` based on `left` output names +val aliased = mutable.ArrayBuffer.empty[Attribute] + val rightProjectList = leftOutputAttrs.map { lattr => - rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse { + val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) } + if (found.isDefined) { +val foundDt = found.get.dataType +(foundDt, lattr.dataType) match { + case (source: StructType, target: StructType) + if allowMissingCol && !source.sameType(target) => Review comment: Hmm, I'm not sure where we can simplify the logic? By adding `canMergeSchemas`, doesn't it look more complicated? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r486761719 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) } } + + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - 1") { Review comment: sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r482709060 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala ## @@ -641,4 +641,30 @@ object StructType extends AbstractDataType { fields.foreach(s => map.put(s.name, s)) map } + + /** + * Returns a `StructType` that contains missing fields recursively from `source` to `target`. + * Note that this doesn't support looking into array type and map type recursively. + */ + def findMissingFields(source: StructType, target: StructType, resolver: Resolver): StructType = { Review comment: OK, it sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r482707006 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -50,18 +122,29 @@ object ResolveUnion extends Rule[LogicalPlan] { } } +(rightProjectList, aliased) + } + + private def unionTwoSides( + left: LogicalPlan, + right: LogicalPlan, + allowMissingCol: Boolean): LogicalPlan = { +val rightOutputAttrs = right.output + +// Builds a project list for `right` based on `left` output names +val (rightProjectList, aliased) = compareAndAddFields(left, right, allowMissingCol) + // Delegates failure checks to `CheckAnalysis` -val notFoundAttrs = rightOutputAttrs.diff(rightProjectList) +val notFoundAttrs = rightOutputAttrs.diff(rightProjectList ++ aliased) val rightChild = Project(rightProjectList ++ notFoundAttrs, right) // Builds a project for `logicalPlan` based on `right` output names, if allowing // missing columns. val leftChild = if (allowMissingCol) { - val missingAttrs = notFoundAttrs.map { attr => -Alias(Literal(null, attr.dataType), attr.name)() - } - if (missingAttrs.nonEmpty) { -Project(leftOutputAttrs ++ missingAttrs, left) + // Add missing (nested) fields to left plan. + val (leftProjectList, _) = compareAndAddFields(rightChild, left, allowMissingCol) + if (leftProjectList.map(_.toAttribute) != left.output) { Review comment: Doesn't `leftProjectList.map(_.toAttribute) != left.output` already cover `leftProjectList.length != left.output.length`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r482707069 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,101 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +assert(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.length == 0) { + None +} else { + Some(addFieldsInto(col, "", missingFields.fields)) +} + } + + /** + * Adds missing fields recursively into given `col` expression. The missing fields are given Review comment: ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r482706359 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ## @@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E case class WithFields( structExpr: Expression, names: Seq[String], -valExprs: Seq[Expression]) extends Unevaluable { +valExprs: Seq[Expression], +sortColumns: Boolean = false) extends Unevaluable { Review comment: I'm not certain if we want to show it or not. Let's keep it as is and see what others think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r480793907 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala ## @@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E case class WithFields( structExpr: Expression, names: Seq[String], -valExprs: Seq[Expression]) extends Unevaluable { +valExprs: Seq[Expression], +sortColumns: Boolean = false) extends Unevaluable { Review comment: Changed `sortColumns` to `sortOutputColumns`. I'm not sure we want to hide `sortColumns`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r480527411 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,97 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +require(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.length == 0) { + None +} else { + Some(addFieldsInto(col, "", missingFields.fields)) +} + } + + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +var currCol = col Review comment: looks good. rewritten. thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r480500164 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala ## @@ -103,4 +104,30 @@ class StructTypeSuite extends SparkFunSuite { val interval = "`a` INTERVAL" assert(fromDDL(interval).toDDL === interval) } + + test("find missing (nested) fields") { Review comment: OK, sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r480239099 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,97 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * Adds missing fields recursively into given `col` expression, based on the target `StructType`. + * For example, given `col` as "a struct, b int" and `target` as + * "a struct, b int, c string", this method should add `a.c` and `c` to + * `col` expression. + */ + private def addFields(col: NamedExpression, target: StructType): Option[Expression] = { +require(col.dataType.isInstanceOf[StructType], "Only support StructType.") + +val resolver = SQLConf.get.resolver +val missingFields = + StructType.findMissingFields(col.dataType.asInstanceOf[StructType], target, resolver) +if (missingFields.length == 0) { + None +} else { + Some(addFieldsInto(col, "", missingFields.fields)) +} + } + + private def addFieldsInto(col: Expression, base: String, fields: Seq[StructField]): Expression = { +var currCol = col +fields.foreach { field => + field.dataType match { +case dt: AtomicType => + // We need to sort columns in result, because we might add another column in other side. + // E.g., we want to union two structs "a int, b long" and "a int, c string". + // If we don't sort, we will have "a int, b long, c string" and "a int, c string, b long", + // which are not compatible. + currCol = WithFields(currCol, s"$base${field.name}", Literal(null, dt), +sortColumns = true) +case st: StructType => + val resolver = SQLConf.get.resolver + val colField = currCol.dataType.asInstanceOf[StructType] +.find(f => resolver(f.name, field.name)) + if (colField.isEmpty) { +// The whole struct is missing. Add a null. +currCol = WithFields(currCol, s"$base${field.name}", Literal(null, st), + sortColumns = true) + } else { +currCol = addFieldsInto(currCol, s"$base${field.name}.", st.fields) + } + } +} +currCol + } + + private def compareAndAddFields( left: LogicalPlan, right: LogicalPlan, - allowMissingCol: Boolean): LogicalPlan = { + allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { val resolver = SQLConf.get.resolver val leftOutputAttrs = left.output val rightOutputAttrs = right.output -// Builds a project list for `right` based on `left` output names +val aliased = mutable.ArrayBuffer.empty[Attribute] + val rightProjectList = leftOutputAttrs.map { lattr => - rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse { + val found = rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) } + if (found.isDefined) { +val foundDt = found.get.dataType +(foundDt, lattr.dataType) match { + case (source: StructType, target: StructType) + if allowMissingCol && !source.sameType(target) => +// Having an output with same name, but different struct type. +// We need to add missing fields. +addFields(found.get, target).map { added => + aliased += found.get + Alias(added, found.get.name)() +}.getOrElse(found.get) // Data type doesn't change. We should add fields at other side. + case _ => +// Same struct type, or +// unsupported: different types, array or map types, or Review comment: Array and map types aren't supported by `WithFields`. I think it is still possible to add them to `WithFields`. Once `WithFields` supports these types, we can add them here too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r480238080 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala ## @@ -641,4 +641,30 @@ object StructType extends AbstractDataType { fields.foreach(s => map.put(s.name, s)) map } + + /** + * Returns a `StructType` that contains missing fields recursively from `source` to `target`. + * Note that this doesn't support looking into array type and map type recursively. Review comment: I leverage `WithFields` to add missing nested fields into structs. `WithFields` doesn't support array or map types currently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
viirya commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r480237210 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala ## @@ -641,4 +641,30 @@ object StructType extends AbstractDataType { fields.foreach(s => map.put(s.name, s)) map } + + /** + * Returns a `StructType` that contains missing fields recursively from `source` to `target`. + * Note that this doesn't support looking into array type and map type recursively. + */ + def findMissingFields(source: StructType, target: StructType, resolver: Resolver): StructType = { Review comment: I feel this is more general method related to `StructType`. So putting it in `StructType`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org