[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r506578657



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##
@@ -2067,6 +2067,13 @@ class Dataset[T] private[sql](
*   // +++++
* }}}
*
+   * Note that `allowMissingColumns` supports nested column in struct types, 
if the config
+   * `spark.sql.unionByName.structSupport.enabled` is enabled. Missing nested 
columns

Review comment:
   good catch. updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r506578485



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,188 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns recursively in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `UpdateFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case u: UpdateFields if u.resolved =>
+  u.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`UpdateFields` has incorrect eval 
expression: $other. " +

Review comment:
   got it. fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-15 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r505677527



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,190 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `UpdateFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case u: UpdateFields if u.resolved =>
+  u.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`UpdateFields` has incorrect eval 
expression: $other. " +
+"Please file a bug report with this error message, stack trace, 
and the query.")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>
+UpdateFields(struct, fieldOps1 ++ fieldOps2)
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * This is called by `compareAndAddFields` when we find two struct columns 
with same name but
+   * different nested fields. This method will find out the missing nested 
fields from `col` to
+   * `target` struct and add these missing nested fields. Currently we don't 
support finding out
+   * missing nested fields of struct nested in array or struct nested in map.
+   */
+  private def addFields(col: NamedExpression, target: StructType): Expression 
= {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+
+// We need to sort columns in result, because we might add another column 
in other side.
+// E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+// If we don't sort, we will have "a int, b long, c string" and
+// "a int, c string, b long", which are not compatible.
+if (missingFields.isEmpty) {
+  sortStructFields(col)
+} else {
+  missingFields.map { s =>
+val struct = addFieldsInto(col, s.fields)
+// Combines `WithFields`s to reduce expression tree.
+val reducedStruct = simplifyWithFields(struct)
+val sorted = sortStructFieldsInWithFields(reducedStruct)
+sorted
+  }.get
+}
+  }
+
+  /**
+   * Adds missing fields 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r504936830



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,190 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `UpdateFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case u: UpdateFields if u.resolved =>
+  u.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`UpdateFields` has incorrect eval 
expression: $other. " +
+"Please file a bug report with this error message, stack trace, 
and the query.")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>

Review comment:
   Actually I plan to move this optimization out of `ResolveUnion` into a 
separate rule in analyzer in #29812. For complex deeply nested schema, it is 
easier to write inefficient expression tree that is very slow in analysis 
phase. For the test case in this PR, it is unable to evaluate the query at all, 
but after adding this optimization, it can normally evaluate.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r504799643



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,190 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `UpdateFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case u: UpdateFields if u.resolved =>
+  u.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`UpdateFields` has incorrect eval 
expression: $other. " +
+"Please file a bug report with this error message, stack trace, 
and the query.")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>
+UpdateFields(struct, fieldOps1 ++ fieldOps2)
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * This is called by `compareAndAddFields` when we find two struct columns 
with same name but
+   * different nested fields. This method will find out the missing nested 
fields from `col` to
+   * `target` struct and add these missing nested fields. Currently we don't 
support finding out
+   * missing nested fields of struct nested in array or struct nested in map.
+   */
+  private def addFields(col: NamedExpression, target: StructType): Expression 
= {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+
+// We need to sort columns in result, because we might add another column 
in other side.
+// E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+// If we don't sort, we will have "a int, b long, c string" and
+// "a int, c string, b long", which are not compatible.
+if (missingFields.isEmpty) {
+  sortStructFields(col)
+} else {
+  missingFields.map { s =>
+val struct = addFieldsInto(col, s.fields)
+// Combines `WithFields`s to reduce expression tree.
+val reducedStruct = simplifyWithFields(struct)
+val sorted = sortStructFieldsInWithFields(reducedStruct)
+sorted
+  }.get
+}
+  }
+
+  /**
+   * Adds missing fields 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r504798080



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,190 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `UpdateFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case u: UpdateFields if u.resolved =>
+  u.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`UpdateFields` has incorrect eval 
expression: $other. " +
+"Please file a bug report with this error message, stack trace, 
and the query.")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>
+UpdateFields(struct, fieldOps1 ++ fieldOps2)
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * This is called by `compareAndAddFields` when we find two struct columns 
with same name but
+   * different nested fields. This method will find out the missing nested 
fields from `col` to
+   * `target` struct and add these missing nested fields. Currently we don't 
support finding out
+   * missing nested fields of struct nested in array or struct nested in map.
+   */
+  private def addFields(col: NamedExpression, target: StructType): Expression 
= {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =

Review comment:
   Good. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r504796446



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,190 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `UpdateFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case u: UpdateFields if u.resolved =>
+  u.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`UpdateFields` has incorrect eval 
expression: $other. " +
+"Please file a bug report with this error message, stack trace, 
and the query.")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case UpdateFields(UpdateFields(struct, fieldOps1), fieldOps2) =>

Review comment:
   Yea. Without optimizing the expressions, we cannot scale up well for 
deeply nested schema, e.g. the added test `SPARK-32376: Make unionByName 
null-filling behavior work with struct columns - deep expr`. in 
`DataFrameSetOperationsSuite`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r504794611



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2740,6 +2740,19 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val UNION_BYNAME_STRUCT_SUPPORT_ENABLED =

Review comment:
   Ok, sounds more correct. I will remove this config.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-10 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r502837273



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2721,6 +2721,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val UNION_BYNAME_STRUCT_SUPPORT_ENABLED =
+buildConf("spark.sql.unionByName.structSupport.enabled")
+  .doc("When true, the `allowMissingColumns` feature of 
`Dataset.unionByName` supports " +
+"nested column in struct types. Missing nested columns of struct 
columns with same " +
+"name will also be filled with null values. This currently does not 
support nested " +
+"columns in array and map types.")

Review comment:
   To have a consistent schema but not just sorting the fields, top-level 
case is much easier. We just iterate each top-level column of left side, and in 
the end putting the fields only existing in right side at the end of schema. 
Then it is done.
   
   For nested column case, filling missing nested columns is done by using 
`WithFields` which basically adds new columns in the end of original schema. To 
achieve same consistent schema like top-level column case, we either adjust all 
nested columns later or simply sort nested columns.
   
   I believe it is not unable to adjust the nested columns to have a more 
natural schema. But I can see this PR grows too big and I would like to cut it 
early. If you are okay, I can address this issue in other PR after this. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-10 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r502837273



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2721,6 +2721,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val UNION_BYNAME_STRUCT_SUPPORT_ENABLED =
+buildConf("spark.sql.unionByName.structSupport.enabled")
+  .doc("When true, the `allowMissingColumns` feature of 
`Dataset.unionByName` supports " +
+"nested column in struct types. Missing nested columns of struct 
columns with same " +
+"name will also be filled with null values. This currently does not 
support nested " +
+"columns in array and map types.")

Review comment:
   To have a consistent schema but not just sorting the fields, top-level 
case is much easier. We just iterate each top-level column of left side, and in 
the end putting the fields only existing in right side at the end of schema. 
Then it is done.
   
   For nested column case, filling missing nested columns is done by using 
`WithFields` which basically adds new columns in the end of original schema. To 
achieve same consistent schema like top-level column case, we either adjust all 
nested columns later or simply sort nested columns.
   
   I believe it is not unable to adjust the nested columns to have a more 
natural schema. But I can see this PR grows too big and I would like to cut it 
early. If you are okay, I can address this issue in other PR after this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-06 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r500034164



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2721,6 +2721,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val UNION_BYNAME_STRUCT_SUPPORT_ENABLED =
+buildConf("spark.sql.unionByName.structSupport.enabled")
+  .doc("When true, the `allowMissingColumns` feature of 
`Dataset.unionByName` supports " +
+"nested column in struct types. Missing nested columns of struct 
columns with same " +
+"name will also be filled with null values. This currently does not 
support nested " +
+"columns in array and map types.")

Review comment:
   Updated both the config doc and API doc in Dataset.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-06 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r500033312



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>

Review comment:
   Revised.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-05 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r499980671



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>

Review comment:
   Okay, I will revise the error message here. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-05 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r499980569



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {

Review comment:
   Yeah, actually there is #29812 for that, but is stuck by other PR that 
is refactoring `WithFields`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-05 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r499980265



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>
+val newNames = mutable.ArrayBuffer.empty[String]
+val newValues = mutable.ArrayBuffer.empty[Expression]
+names.zip(values).reverse.foreach { case (name, value) =>
+  if (!newNames.contains(name)) {
+newNames += name
+newValues += value
+  }
+}
+WithFields(structExpr, names = newNames.reverse, valExprs = 
newValues.reverse)
+  case WithFields(WithFields(struct, names1, valExprs1), names2, 
valExprs2) =>
+WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2)

Review comment:
   This `simplifyWithFields` will be removed out to separate rule: #29812. 
But currently I leave them here to pass all added tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-05 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r499980265



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>
+val newNames = mutable.ArrayBuffer.empty[String]
+val newValues = mutable.ArrayBuffer.empty[Expression]
+names.zip(values).reverse.foreach { case (name, value) =>
+  if (!newNames.contains(name)) {
+newNames += name
+newValues += value
+  }
+}
+WithFields(structExpr, names = newNames.reverse, valExprs = 
newValues.reverse)
+  case WithFields(WithFields(struct, names1, valExprs1), names2, 
valExprs2) =>
+WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2)

Review comment:
   This part will be removed out to separate rule: #29812. But currently I 
leave them here to pass all added tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-10-05 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r499979944



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2721,6 +2721,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val UNION_BYNAME_STRUCT_SUPPORT_ENABLED =
+buildConf("spark.sql.unionByName.structSupport.enabled")
+  .doc("When true, the `allowMissingColumns` feature of 
`Dataset.unionByName` supports " +
+"nested column in struct types. Missing nested columns of struct 
columns with same " +
+"name will also be filled with null values. This currently does not 
support nested " +
+"columns in array and map types.")

Review comment:
   Oh, right, I will update the doc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-22 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r493101256



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>

Review comment:
   When I was working on simplify expression tree for scale issue, I was 
running into this case. I changed other part of code later, so maybe this isn't 
hit now. Let me check.

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-22 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r493103615



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>

Review comment:
   Ok, seems not. However, this is still a useful to optimize inefficient 
`WithFields` expressions. I relied on these rules to simplify expression tree 
and debugging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-22 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r493101256



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>

Review comment:
   When I was working on simplify expression tree for scale issue, I was 
running into this case. I changed other part of code later, so maybe this isn't 
hit now. Let me check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-22 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r491711425



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>
+val newNames = mutable.ArrayBuffer.empty[String]
+val newValues = mutable.ArrayBuffer.empty[Expression]
+names.zip(values).reverse.foreach { case (name, value) =>
+  if (!newNames.contains(name)) {
+newNames += name
+newValues += value
+  }
+}
+WithFields(structExpr, names = newNames.reverse, valExprs = 
newValues.reverse)
+  case WithFields(WithFields(struct, names1, valExprs1), names2, 
valExprs2) =>
+WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2)
+  case g @ GetStructField(WithFields(_, names, values), _, _)
+if names.contains(g.extractFieldName) =>
+names.zip(values).reverse.filter(p => p._1 == 
g.extractFieldName).head._2
+}
+  }

Review comment:
   #29812 29812

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-20 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r491711425



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>
+val newNames = mutable.ArrayBuffer.empty[String]
+val newValues = mutable.ArrayBuffer.empty[Expression]
+names.zip(values).reverse.foreach { case (name, value) =>
+  if (!newNames.contains(name)) {
+newNames += name
+newValues += value
+  }
+}
+WithFields(structExpr, names = newNames.reverse, valExprs = 
newValues.reverse)
+  case WithFields(WithFields(struct, names1, valExprs1), names2, 
valExprs2) =>
+WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2)
+  case g @ GetStructField(WithFields(_, names, values), _, _)
+if names.contains(g.extractFieldName) =>
+names.zip(values).reverse.filter(p => p._1 == 
g.extractFieldName).head._2
+}
+  }

Review comment:
   #29812 29812

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-19 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r491638577



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,202 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts recursively columns in a struct expression based on 
column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) =>
+val fieldExpr = GetStructField(KnownNotNull(expr), i)
+if (fieldExpr.dataType.isInstanceOf[StructType]) {
+  (name, sortStructFields(fieldExpr))
+} else {
+  (name, fieldExpr)
+}
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  /**
+   * Assumes input expressions are field expression of `CreateNamedStruct`. 
This method
+   * sorts the expressions based on field names.
+   */
+  private def sortFieldExprs(fieldExprs: Seq[Expression]): Seq[Expression] = {
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair.head.isInstanceOf[Literal])
+  pair.head.eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortFieldExprs(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  def simplifyWithFields(expr: Expression): Expression = {
+expr.transformUp {
+  case WithFields(structExpr, names, values) if names.distinct.length != 
names.length =>
+val newNames = mutable.ArrayBuffer.empty[String]
+val newValues = mutable.ArrayBuffer.empty[Expression]
+names.zip(values).reverse.foreach { case (name, value) =>
+  if (!newNames.contains(name)) {
+newNames += name
+newValues += value
+  }
+}
+WithFields(structExpr, names = newNames.reverse, valExprs = 
newValues.reverse)
+  case WithFields(WithFields(struct, names1, valExprs1), names2, 
valExprs2) =>
+WithFields(struct, names1 ++ names2, valExprs1 ++ valExprs2)
+  case g @ GetStructField(WithFields(_, names, values), _, _)
+if names.contains(g.extractFieldName) =>
+names.zip(values).reverse.filter(p => p._1 == 
g.extractFieldName).head._2
+}
+  }

Review comment:
   This is to optimize `WithFields` expression chain. I will put this into 
a separate PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-19 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r491637817



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns in a struct expression based on column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+assert(expr.dataType.isInstanceOf[StructType])
+
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) => (name, GetStructField(KnownNotNull(expr), 
i).asInstanceOf[Expression])
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = 
{
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair(0).isInstanceOf[Literal])
+  pair(0).eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortStructFields(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortStructFields(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * This is called by `compareAndAddFields` when we find two struct columns 
with same name but
+   * different nested fields. This method will find out the missing nested 
fields from `col` to
+   * `target` struct and add these missing nested fields. Currently we don't 
support finding out
+   * missing nested fields of struct nested in array or struct nested in map.
+   */
+  private def addFields(col: NamedExpression, target: StructType): Expression 
= {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+
+// We need to sort columns in result, because we might add another column 
in other side.
+// E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+// If we don't sort, we will have "a int, b long, c string" and
+// "a int, c string, b long", which are not compatible.
+if (missingFields.isEmpty) {
+  sortStructFields(col)

Review comment:
   @maropu I got you point when I was fixing the performance issue. Yeah, 
we should. I fixed it in latest commit. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-19 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r491637762



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] {
   }
 }
 
+(rightProjectList, aliased)
+  }

Review comment:
   Improved in latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489799250



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns in a struct expression based on column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+assert(expr.dataType.isInstanceOf[StructType])

Review comment:
   yes, let me remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489799141



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns in a struct expression based on column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+assert(expr.dataType.isInstanceOf[StructType])
+
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) => (name, GetStructField(KnownNotNull(expr), 
i).asInstanceOf[Expression])

Review comment:
   looks like redundant, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489542367



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns in a struct expression based on column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+assert(expr.dataType.isInstanceOf[StructType])
+
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) => (name, GetStructField(KnownNotNull(expr), 
i).asInstanceOf[Expression])
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = 
{
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair(0).isInstanceOf[Literal])
+  pair(0).eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortStructFields(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortStructFields(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")
+  }
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * This is called by `compareAndAddFields` when we find two struct columns 
with same name but
+   * different nested fields. This method will find out the missing nested 
fields from `col` to
+   * `target` struct and add these missing nested fields. Currently we don't 
support finding out
+   * missing nested fields of struct nested in array or struct nested in map.
+   */
+  private def addFields(col: NamedExpression, target: StructType): Expression 
= {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+
+// We need to sort columns in result, because we might add another column 
in other side.
+// E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+// If we don't sort, we will have "a int, b long, c string" and
+// "a int, c string, b long", which are not compatible.
+if (missingFields.isEmpty) {
+  sortStructFields(col)

Review comment:
   Oh, we need this to make sure two sides have consistent schema. For 
example the test case from @fqaiser94 in 
https://github.com/apache/spark/pull/29587#discussion_r488100532, when we add 
field to one side, another side still needs to sort its column, otherwise there 
is inconsistency.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489540174



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}

Review comment:
   OK. :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489540466



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns in a struct expression based on column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {

Review comment:
   sure, let me think about better method names.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489539877



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,168 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, 
Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * This method sorts columns in a struct expression based on column names.
+   */
+  private def sortStructFields(expr: Expression): Expression = {
+assert(expr.dataType.isInstanceOf[StructType])
+
+val existingExprs = 
expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map {
+  case (name, i) => (name, GetStructField(KnownNotNull(expr), 
i).asInstanceOf[Expression])
+}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2))
+
+val newExpr = CreateNamedStruct(existingExprs)
+if (expr.nullable) {
+  If(IsNull(expr), Literal(null, newExpr.dataType), newExpr)
+} else {
+  newExpr
+}
+  }
+
+  private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = 
{
+fieldExprs.grouped(2).map { e =>
+  Seq(e.head, e.last)
+}.toSeq.sortBy { pair =>
+  assert(pair(0).isInstanceOf[Literal])
+  pair(0).eval().asInstanceOf[UTF8String].toString
+}.flatten
+  }
+
+  /**
+   * This helper method sorts fields in a `WithFields` expression by field 
name.
+   */
+  private def sortStructFieldsInWithFields(expr: Expression): Expression = 
expr transformUp {
+case w: WithFields if w.resolved =>
+  w.evalExpr match {
+case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) =>
+  val sorted = sortStructFields(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = 
newStruct)
+case CreateNamedStruct(fieldExprs) =>
+  val sorted = sortStructFields(fieldExprs)
+  val newStruct = CreateNamedStruct(sorted)
+  newStruct
+case other =>
+  throw new AnalysisException(s"`WithFields` has incorrect eval 
expression: $other")

Review comment:
   Yea, I think so. Or we change `WithFields` but forget to update here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489539121



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] {
   }
 }
 
+(rightProjectList, aliased)
+  }

Review comment:
   Let me think how we can improve it..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-16 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r489537683



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -50,18 +189,29 @@ object ResolveUnion extends Rule[LogicalPlan] {
   }
 }
 
+(rightProjectList, aliased)
+  }

Review comment:
   Well, I currently extract the evaluation expr out from `WithFields`, 
otherwise it is more complicated...I think if you use `WithFields` to manually 
change such deep structure, there might be same scale issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r488324577



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -507,33 +507,156 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("SPARK-29358: Make unionByName optionally fill missing columns with 
nulls") {
-var df1 = Seq(1, 2, 3).toDF("a")
-var df2 = Seq(3, 1, 2).toDF("b")
-val df3 = Seq(2, 3, 1).toDF("c")
-val unionDf = df1.unionByName(df2.unionByName(df3, true), true)
-checkAnswer(unionDf,
-  Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1
-Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // 
df2
-Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil 
// df3
-)
+Seq("true", "false").foreach { config =>
+  withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> config) {
+var df1 = Seq(1, 2, 3).toDF("a")
+var df2 = Seq(3, 1, 2).toDF("b")
+val df3 = Seq(2, 3, 1).toDF("c")
+val unionDf = df1.unionByName(df2.unionByName(df3, true), true)
+checkAnswer(unionDf,
+  Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // 
df1
+Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: 
// df2
+Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: 
Nil // df3
+)
+
+df1 = Seq((1, 2)).toDF("a", "c")
+df2 = Seq((3, 4, 5)).toDF("a", "b", "c")
+checkAnswer(df1.unionByName(df2, true),
+  Row(1, 2, null) :: Row(3, 5, 4) :: Nil)
+checkAnswer(df2.unionByName(df1, true),
+  Row(3, 4, 5) :: Row(1, null, 2) :: Nil)
+
+withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+  df2 = Seq((3, 4, 5)).toDF("a", "B", "C")
+  val union1 = df1.unionByName(df2, true)
+  val union2 = df2.unionByName(df1, true)
+
+  checkAnswer(union1, Row(1, 2, null, null) :: Row(3, null, 4, 5) :: 
Nil)
+  checkAnswer(union2, Row(3, 4, 5, null) :: Row(1, null, null, 2) :: 
Nil)
+
+  assert(union1.schema.fieldNames === Array("a", "c", "B", "C"))
+  assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
+}
+  }
+}
+  }
 
-df1 = Seq((1, 2)).toDF("a", "c")
-df2 = Seq((3, 4, 5)).toDF("a", "b", "c")
-checkAnswer(df1.unionByName(df2, true),
-  Row(1, 2, null) :: Row(3, 5, 4) :: Nil)
-checkAnswer(df2.unionByName(df1, true),
-  Row(3, 4, 5) :: Row(1, null, 2) :: Nil)
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - simple") {
+withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") {
+  val df1 = Seq(((1, 2, 3), 0), ((2, 3, 4), 1), ((3, 4, 5), 2)).toDF("a", 
"idx")
+  val df2 = Seq(((3, 4), 0), ((1, 2), 1), ((2, 3), 2)).toDF("a", "idx")
+  val df3 = Seq(((100, 101, 102, 103), 0), ((110, 111, 112, 113), 1), 
((120, 121, 122, 123), 2))
+.toDF("a", "idx")
+
+  var unionDf = df1.unionByName(df2, true)
+
+  checkAnswer(unionDf,
+Row(Row(1, 2, 3), 0) :: Row(Row(2, 3, 4), 1) :: Row(Row(3, 4, 5), 2) ::
+  Row(Row(3, 4, null), 0) :: Row(Row(1, 2, null), 1) :: Row(Row(2, 3, 
null), 2) :: Nil
+  )
+
+  assert(unionDf.schema.toDDL == "`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: 
INT>,`idx` INT")
+
+  unionDf = df1.unionByName(df2, true).unionByName(df3, true)
+
+  checkAnswer(unionDf,
+Row(Row(1, 2, 3, null), 0) ::
+  Row(Row(2, 3, 4, null), 1) ::
+  Row(Row(3, 4, 5, null), 2) :: // df1
+  Row(Row(3, 4, null, null), 0) ::
+  Row(Row(1, 2, null, null), 1) ::
+  Row(Row(2, 3, null, null), 2) :: // df2
+  Row(Row(100, 101, 102, 103), 0) ::
+  Row(Row(110, 111, 112, 113), 1) ::
+  Row(Row(120, 121, 122, 123), 2) :: Nil // df3
+  )
+  assert(unionDf.schema.toDDL ==
+"`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT, `_4`: INT>,`idx` INT")
+}
+  }
 
-withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-  df2 = Seq((3, 4, 5)).toDF("a", "B", "C")
-  val union1 = df1.unionByName(df2, true)
-  val union2 = df2.unionByName(df1, true)
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - nested") {
+withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") {
+  val df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2".toDF("id", 
"a")
+  val df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L.toDF("id", 
"a")
+
+  val expectedSchema = "`id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, " +
+"`nested`: STRUCT<`a`: INT, `b`: BIGINT, `c`: STRING>>"
+
+  var unionDf = df1.unionByName(df2, true)
+  checkAnswer(unionDf,
+Row(0, Row(0, 1, Row(1, null, "2"))) ::
+  Row(1, Row(1, 2, 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r488314435



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 case class WithFields(
 structExpr: Expression,
 names: Seq[String],
-valExprs: Seq[Expression]) extends Unevaluable {
+valExprs: Seq[Expression],
+sortOutputColumns: Boolean = false) extends Unevaluable {

Review comment:
   In the latest commit, I moved sorting columns out from `WithFields`. I 
will fix the aforementioned failing case later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r488314435



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 case class WithFields(
 structExpr: Expression,
 names: Seq[String],
-valExprs: Seq[Expression]) extends Unevaluable {
+valExprs: Seq[Expression],
+sortOutputColumns: Boolean = false) extends Unevaluable {

Review comment:
   In the latest commit, I moved sorting columns from `WithFields`. I will 
fix the aforementioned failing case later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r488314435



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 case class WithFields(
 structExpr: Expression,
 names: Seq[String],
-valExprs: Seq[Expression]) extends Unevaluable {
+valExprs: Seq[Expression],
+sortOutputColumns: Boolean = false) extends Unevaluable {

Review comment:
   In the latest commit, I moved sorting columns from `WithFields`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r488155092



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 case class WithFields(
 structExpr: Expression,
 names: Seq[String],
-valExprs: Seq[Expression]) extends Unevaluable {
+valExprs: Seq[Expression],
+sortOutputColumns: Boolean = false) extends Unevaluable {

Review comment:
   Sounds good to me. We probably don't need an individual expression for 
that. I think a private helper method should be enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-14 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r488109612



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -507,33 +507,156 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("SPARK-29358: Make unionByName optionally fill missing columns with 
nulls") {
-var df1 = Seq(1, 2, 3).toDF("a")
-var df2 = Seq(3, 1, 2).toDF("b")
-val df3 = Seq(2, 3, 1).toDF("c")
-val unionDf = df1.unionByName(df2.unionByName(df3, true), true)
-checkAnswer(unionDf,
-  Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1
-Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // 
df2
-Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil 
// df3
-)
+Seq("true", "false").foreach { config =>
+  withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> config) {
+var df1 = Seq(1, 2, 3).toDF("a")
+var df2 = Seq(3, 1, 2).toDF("b")
+val df3 = Seq(2, 3, 1).toDF("c")
+val unionDf = df1.unionByName(df2.unionByName(df3, true), true)
+checkAnswer(unionDf,
+  Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // 
df1
+Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: 
// df2
+Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: 
Nil // df3
+)
+
+df1 = Seq((1, 2)).toDF("a", "c")
+df2 = Seq((3, 4, 5)).toDF("a", "b", "c")
+checkAnswer(df1.unionByName(df2, true),
+  Row(1, 2, null) :: Row(3, 5, 4) :: Nil)
+checkAnswer(df2.unionByName(df1, true),
+  Row(3, 4, 5) :: Row(1, null, 2) :: Nil)
+
+withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+  df2 = Seq((3, 4, 5)).toDF("a", "B", "C")
+  val union1 = df1.unionByName(df2, true)
+  val union2 = df2.unionByName(df1, true)
+
+  checkAnswer(union1, Row(1, 2, null, null) :: Row(3, null, 4, 5) :: 
Nil)
+  checkAnswer(union2, Row(3, 4, 5, null) :: Row(1, null, null, 2) :: 
Nil)
+
+  assert(union1.schema.fieldNames === Array("a", "c", "B", "C"))
+  assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
+}
+  }
+}
+  }
 
-df1 = Seq((1, 2)).toDF("a", "c")
-df2 = Seq((3, 4, 5)).toDF("a", "b", "c")
-checkAnswer(df1.unionByName(df2, true),
-  Row(1, 2, null) :: Row(3, 5, 4) :: Nil)
-checkAnswer(df2.unionByName(df1, true),
-  Row(3, 4, 5) :: Row(1, null, 2) :: Nil)
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - simple") {
+withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") {
+  val df1 = Seq(((1, 2, 3), 0), ((2, 3, 4), 1), ((3, 4, 5), 2)).toDF("a", 
"idx")
+  val df2 = Seq(((3, 4), 0), ((1, 2), 1), ((2, 3), 2)).toDF("a", "idx")
+  val df3 = Seq(((100, 101, 102, 103), 0), ((110, 111, 112, 113), 1), 
((120, 121, 122, 123), 2))
+.toDF("a", "idx")
+
+  var unionDf = df1.unionByName(df2, true)
+
+  checkAnswer(unionDf,
+Row(Row(1, 2, 3), 0) :: Row(Row(2, 3, 4), 1) :: Row(Row(3, 4, 5), 2) ::
+  Row(Row(3, 4, null), 0) :: Row(Row(1, 2, null), 1) :: Row(Row(2, 3, 
null), 2) :: Nil
+  )
+
+  assert(unionDf.schema.toDDL == "`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: 
INT>,`idx` INT")
+
+  unionDf = df1.unionByName(df2, true).unionByName(df3, true)
+
+  checkAnswer(unionDf,
+Row(Row(1, 2, 3, null), 0) ::
+  Row(Row(2, 3, 4, null), 1) ::
+  Row(Row(3, 4, 5, null), 2) :: // df1
+  Row(Row(3, 4, null, null), 0) ::
+  Row(Row(1, 2, null, null), 1) ::
+  Row(Row(2, 3, null, null), 2) :: // df2
+  Row(Row(100, 101, 102, 103), 0) ::
+  Row(Row(110, 111, 112, 113), 1) ::
+  Row(Row(120, 121, 122, 123), 2) :: Nil // df3
+  )
+  assert(unionDf.schema.toDDL ==
+"`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT, `_4`: INT>,`idx` INT")
+}
+  }
 
-withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-  df2 = Seq((3, 4, 5)).toDF("a", "B", "C")
-  val union1 = df1.unionByName(df2, true)
-  val union2 = df2.unionByName(df1, true)
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - nested") {
+withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") {
+  val df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2".toDF("id", 
"a")
+  val df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L.toDF("id", 
"a")
+
+  val expectedSchema = "`id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, " +
+"`nested`: STRUCT<`a`: INT, `b`: BIGINT, `c`: STRING>>"
+
+  var unionDf = df1.unionByName(df2, true)
+  checkAnswer(unionDf,
+Row(0, Row(0, 1, Row(1, null, "2"))) ::
+  Row(1, Row(1, 2, 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-13 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487662093



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,107 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),
+sortOutputColumns = true)
+  }
+}
+  }
+
+  private def compareAndAddFields(

Review comment:
   Added.

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,107 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.

Review comment:
   Revised the comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-13 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487645676



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,107 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),

Review comment:
   Oh, I think it is bad for performance using `WithFields` like that. 
`WithFields` basically extracts fields from a struct and re-construct the 
struct with new field. So you can image that too many missing columns cannot 
get good performance.
   
   However, I think we currently have no better approach to fill missing 
(nested) fields in structs. We might add a note to the `unionByName` API for 
performance notice if you think it is necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-13 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487647961



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,107 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level

Review comment:
   There are end-to-end tests for that. I will update this comment with the 
example.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-13 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487645676



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,107 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),

Review comment:
   Oh, I think it is bad for performance using `WithFields` like that. 
`WithFields` basically extracts fields from a struct and re-construct the 
struct with new field. So you can image that too many missing columns cannot 
get good performance.
   
   However, I think we currently have no better approach to fill missing 
(nested) fields in structs. We might add a note to the `unionByName` API for 
performance notice.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487484664



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
##
@@ -103,4 +105,112 @@ class StructTypeSuite extends SparkFunSuite {
 val interval = "`a` INTERVAL"
 assert(fromDDL(interval).toDDL === interval)
   }
+
+  test("find missing (nested) fields") {
+val schema = StructType.fromDDL(
+  "c1 INT, c2 STRUCT>")
+val resolver = SQLConf.get.resolver
+
+val source1 = StructType.fromDDL("c1 INT")
+val missing1 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source1, schema, resolver)
+  .exists(_.sameType(missing1)))
+
+val source2 = StructType.fromDDL("c1 INT, c3 STRING")
+val missing2 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source2, schema, resolver)
+  .exists(_.sameType(missing2)))
+
+val source3 = StructType.fromDDL("c1 INT, c2 STRUCT")
+val missing3 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source3, schema, resolver)
+  .exists(_.sameType(missing3)))
+
+val source4 = StructType.fromDDL("c1 INT, c2 STRUCT>")
+val missing4 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source4, schema, resolver)
+  .exists(_.sameType(missing4)))
+
+val schemaWithArray = StructType.fromDDL(
+  "c1 INT, c2 ARRAY>")
+val source5 = StructType.fromDDL(
+  "c1 INT")
+val missing5 = StructType.fromDDL(
+  "c2 ARRAY>")
+assert(
+  StructType.findMissingFields(source5, schemaWithArray, resolver)
+.exists(_.sameType(missing5)))
+
+val schemaWithMap1 = StructType.fromDDL(
+  "c1 INT, c2 MAP, STRING>, c3 LONG")
+val source6 = StructType.fromDDL(
+  "c1 INT, c3 LONG")
+val missing6 = StructType.fromDDL(
+  "c2 MAP, STRING>")
+assert(
+  StructType.findMissingFields(source6, schemaWithMap1, resolver)
+.exists(_.sameType(missing6)))
+
+val schemaWithMap2 = StructType.fromDDL(
+  "c1 INT, c2 MAP>, c3 STRING")
+val source7 = StructType.fromDDL(
+  "c1 INT, c3 STRING")
+val missing7 = StructType.fromDDL(
+  "c2 MAP>")
+assert(
+  StructType.findMissingFields(source7, schemaWithMap2, resolver)
+.exists(_.sameType(missing7)))
+
+// Unsupported: nested struct in array, map
+val source8 = StructType.fromDDL(
+  "c1 INT, c2 ARRAY>")
+// `findMissingFields` doesn't support looking into nested struct in array 
type.
+assert(StructType.findMissingFields(source8, schemaWithArray, 
resolver).isEmpty)
+
+val source9 = StructType.fromDDL(
+  "c1 INT, c2 MAP, STRING>, c3 LONG")
+// `findMissingFields` doesn't support looking into nested struct in map 
type.
+assert(StructType.findMissingFields(source9, schemaWithMap1, 
resolver).isEmpty)
+
+val source10 = StructType.fromDDL(
+  "c1 INT, c2 MAP>, c3 STRING")
+// `findMissingFields` doesn't support looking into nested struct in map 
type.
+assert(StructType.findMissingFields(source10, schemaWithMap2, 
resolver).isEmpty)
+  }
+
+  test("find missing (nested) fields: case sensitive cases") {
+withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+  val schema = StructType.fromDDL(
+"c1 INT, c2 STRUCT>")
+  val resolver = SQLConf.get.resolver
+
+  val source1 = StructType.fromDDL("c1 INT, C2 LONG")
+  val missing1 = StructType.fromDDL(
+"c2 STRUCT>")
+  assert(StructType.findMissingFields(source1, schema, resolver)
+.exists(_.sameType(missing1)))
+
+  val source2 = StructType.fromDDL("c2 LONG")
+  val missing2 = StructType.fromDDL(
+"c1 INT")

Review comment:
   Oh, sure. I missed that breaking lines.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487484292



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
##
@@ -103,4 +105,112 @@ class StructTypeSuite extends SparkFunSuite {
 val interval = "`a` INTERVAL"
 assert(fromDDL(interval).toDDL === interval)
   }
+
+  test("find missing (nested) fields") {

Review comment:
   ok





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487484192



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
##
@@ -103,4 +105,112 @@ class StructTypeSuite extends SparkFunSuite {
 val interval = "`a` INTERVAL"
 assert(fromDDL(interval).toDDL === interval)
   }
+
+  test("find missing (nested) fields") {
+val schema = StructType.fromDDL(
+  "c1 INT, c2 STRUCT>")
+val resolver = SQLConf.get.resolver
+
+val source1 = StructType.fromDDL("c1 INT")
+val missing1 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source1, schema, resolver)
+  .exists(_.sameType(missing1)))
+
+val source2 = StructType.fromDDL("c1 INT, c3 STRING")
+val missing2 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source2, schema, resolver)
+  .exists(_.sameType(missing2)))
+
+val source3 = StructType.fromDDL("c1 INT, c2 STRUCT")
+val missing3 = StructType.fromDDL(
+  "c2 STRUCT>")
+assert(StructType.findMissingFields(source3, schema, resolver)
+  .exists(_.sameType(missing3)))
+
+val source4 = StructType.fromDDL("c1 INT, c2 STRUCT>")
+val missing4 = StructType.fromDDL(
+  "c2 STRUCT>")

Review comment:
   Oops, sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487463931



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
 }
   }
+

Review comment:
   Added.

##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
##
@@ -103,4 +104,80 @@ class StructTypeSuite extends SparkFunSuite {
 val interval = "`a` INTERVAL"
 assert(fromDDL(interval).toDDL === interval)
   }
+
+  test("find missing (nested) fields") {
+val schema = StructType.fromDDL(
+  "c1 INT, c2 STRUCT>")
+val resolver = SQLConf.get.resolver

Review comment:
   yes, added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r487457983



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,101 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),
+sortOutputColumns = true)
+  }
+}
+  }
+
+  private def compareAndAddFields(
   left: LogicalPlan,
   right: LogicalPlan,
-  allowMissingCol: Boolean): LogicalPlan = {
+  allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) 
= {
 val resolver = SQLConf.get.resolver
 val leftOutputAttrs = left.output
 val rightOutputAttrs = right.output
 
-// Builds a project list for `right` based on `left` output names
+val aliased = mutable.ArrayBuffer.empty[Attribute]
+
 val rightProjectList = leftOutputAttrs.map { lattr =>
-  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+  val found = rightOutputAttrs.find { rattr => resolver(lattr.name, 
rattr.name) }
+  if (found.isDefined) {
+val foundDt = found.get.dataType
+(foundDt, lattr.dataType) match {
+  case (source: StructType, target: StructType)
+  if allowMissingCol && !source.sameType(target) =>

Review comment:
   I see. I will add more comments explaining this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r486761719



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
 }
   }
+
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - 1") {

Review comment:
   sure.

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,101 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),
+sortOutputColumns = true)
+  }
+}
+  }
+
+  private def compareAndAddFields(
   left: LogicalPlan,
   right: LogicalPlan,
-  allowMissingCol: Boolean): LogicalPlan = {
+  allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) 
= {
 val resolver = SQLConf.get.resolver
 val leftOutputAttrs = left.output
 val rightOutputAttrs = right.output
 
-// Builds a project list for `right` based on `left` output names
+val aliased = mutable.ArrayBuffer.empty[Attribute]
+
 val rightProjectList = leftOutputAttrs.map { lattr =>
-  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+  val found = rightOutputAttrs.find { rattr => resolver(lattr.name, 
rattr.name) }
+  if (found.isDefined) {
+val foundDt = found.get.dataType
+(foundDt, lattr.dataType) match {
+  case (source: StructType, target: StructType)
+  if allowMissingCol && !source.sameType(target) =>

Review comment:
   Hmm, I'm not sure where we can simplify the logic? By adding 
`canMergeSchemas`, doesn't it look more complicated?

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r486761719



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
 }
   }
+
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - 1") {

Review comment:
   sure.

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,101 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),
+sortOutputColumns = true)
+  }
+}
+  }
+
+  private def compareAndAddFields(
   left: LogicalPlan,
   right: LogicalPlan,
-  allowMissingCol: Boolean): LogicalPlan = {
+  allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) 
= {
 val resolver = SQLConf.get.resolver
 val leftOutputAttrs = left.output
 val rightOutputAttrs = right.output
 
-// Builds a project list for `right` based on `left` output names
+val aliased = mutable.ArrayBuffer.empty[Attribute]
+
 val rightProjectList = leftOutputAttrs.map { lattr =>
-  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+  val found = rightOutputAttrs.find { rattr => resolver(lattr.name, 
rattr.name) }
+  if (found.isDefined) {
+val foundDt = found.get.dataType
+(foundDt, lattr.dataType) match {
+  case (source: StructType, target: StructType)
+  if allowMissingCol && !source.sameType(target) =>

Review comment:
   Hmm, I'm not sure where we can simplify the logic? By adding 
`canMergeSchemas`, doesn't it look more complicated?

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-12 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r486761719



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
 }
   }
+
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - 1") {

Review comment:
   sure.

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,101 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),
+sortOutputColumns = true)
+  }
+}
+  }
+
+  private def compareAndAddFields(
   left: LogicalPlan,
   right: LogicalPlan,
-  allowMissingCol: Boolean): LogicalPlan = {
+  allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) 
= {
 val resolver = SQLConf.get.resolver
 val leftOutputAttrs = left.output
 val rightOutputAttrs = right.output
 
-// Builds a project list for `right` based on `left` output names
+val aliased = mutable.ArrayBuffer.empty[Attribute]
+
 val rightProjectList = leftOutputAttrs.map { lattr =>
-  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+  val found = rightOutputAttrs.find { rattr => resolver(lattr.name, 
rattr.name) }
+  if (found.isDefined) {
+val foundDt = found.get.dataType
+(foundDt, lattr.dataType) match {
+  case (source: StructType, target: StructType)
+  if allowMissingCol && !source.sameType(target) =>

Review comment:
   Hmm, I'm not sure where we can simplify the logic? By adding 
`canMergeSchemas`, doesn't it look more complicated?





This is an automated message from the Apache Git Service.
To 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-10 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r486762671



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,101 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.isEmpty) {
+  None
+} else {
+  missingFields.map(s => addFieldsInto(col, "", s.fields))
+}
+  }
+
+  /**
+   * Adds missing fields recursively into given `col` expression. The missing 
fields are given
+   * in `fields`. For example, given `col` as "a struct, b int", 
and `fields` is
+   * "a struct, c string". This method will add a nested `a.c` field 
and a top-level
+   * `c` field to `col` and fill null values for them.
+   */
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+fields.foldLeft(col) { case (currCol, field) =>
+  field.dataType match {
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+WithFields(currCol, s"$base${field.name}", Literal(null, st),
+  sortOutputColumns = true)
+  } else {
+addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+case dt =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  WithFields(currCol, s"$base${field.name}", Literal(null, dt),
+sortOutputColumns = true)
+  }
+}
+  }
+
+  private def compareAndAddFields(
   left: LogicalPlan,
   right: LogicalPlan,
-  allowMissingCol: Boolean): LogicalPlan = {
+  allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) 
= {
 val resolver = SQLConf.get.resolver
 val leftOutputAttrs = left.output
 val rightOutputAttrs = right.output
 
-// Builds a project list for `right` based on `left` output names
+val aliased = mutable.ArrayBuffer.empty[Attribute]
+
 val rightProjectList = leftOutputAttrs.map { lattr =>
-  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+  val found = rightOutputAttrs.find { rattr => resolver(lattr.name, 
rattr.name) }
+  if (found.isDefined) {
+val foundDt = found.get.dataType
+(foundDt, lattr.dataType) match {
+  case (source: StructType, target: StructType)
+  if allowMissingCol && !source.sameType(target) =>

Review comment:
   Hmm, I'm not sure where we can simplify the logic? By adding 
`canMergeSchemas`, doesn't it look more complicated?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-10 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r486761719



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
##
@@ -536,4 +536,71 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
   assert(union2.schema.fieldNames === Array("a", "B", "C", "c"))
 }
   }
+
+  test("SPARK-32376: Make unionByName null-filling behavior work with struct 
columns - 1") {

Review comment:
   sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-02 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r482709060



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##
@@ -641,4 +641,30 @@ object StructType extends AbstractDataType {
 fields.foreach(s => map.put(s.name, s))
 map
   }
+
+  /**
+   * Returns a `StructType` that contains missing fields recursively from 
`source` to `target`.
+   * Note that this doesn't support looking into array type and map type 
recursively.
+   */
+  def findMissingFields(source: StructType, target: StructType, resolver: 
Resolver): StructType = {

Review comment:
   OK, it sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-02 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r482707006



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -50,18 +122,29 @@ object ResolveUnion extends Rule[LogicalPlan] {
   }
 }
 
+(rightProjectList, aliased)
+  }
+
+  private def unionTwoSides(
+  left: LogicalPlan,
+  right: LogicalPlan,
+  allowMissingCol: Boolean): LogicalPlan = {
+val rightOutputAttrs = right.output
+
+// Builds a project list for `right` based on `left` output names
+val (rightProjectList, aliased) = compareAndAddFields(left, right, 
allowMissingCol)
+
 // Delegates failure checks to `CheckAnalysis`
-val notFoundAttrs = rightOutputAttrs.diff(rightProjectList)
+val notFoundAttrs = rightOutputAttrs.diff(rightProjectList ++ aliased)
 val rightChild = Project(rightProjectList ++ notFoundAttrs, right)
 
 // Builds a project for `logicalPlan` based on `right` output names, if 
allowing
 // missing columns.
 val leftChild = if (allowMissingCol) {
-  val missingAttrs = notFoundAttrs.map { attr =>
-Alias(Literal(null, attr.dataType), attr.name)()
-  }
-  if (missingAttrs.nonEmpty) {
-Project(leftOutputAttrs ++ missingAttrs, left)
+  // Add missing (nested) fields to left plan.
+  val (leftProjectList, _) = compareAndAddFields(rightChild, left, 
allowMissingCol)
+  if (leftProjectList.map(_.toAttribute) != left.output) {

Review comment:
   Doesn't `leftProjectList.map(_.toAttribute) != left.output` already 
cover `leftProjectList.length != left.output.length`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-02 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r482707069



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,101 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should add 
`a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+assert(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.length == 0) {
+  None
+} else {
+  Some(addFieldsInto(col, "", missingFields.fields))
+}
+  }
+
+  /**
+   *  Adds missing fields recursively into given `col` expression. The missing 
fields are given

Review comment:
   ok.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-09-02 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r482706359



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 case class WithFields(
 structExpr: Expression,
 names: Seq[String],
-valExprs: Seq[Expression]) extends Unevaluable {
+valExprs: Seq[Expression],
+sortColumns: Boolean = false) extends Unevaluable {

Review comment:
   I'm not certain if we want to show it or not. Let's keep it as is and 
see what others think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-08-31 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r480793907



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -546,7 +547,8 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 case class WithFields(
 structExpr: Expression,
 names: Seq[String],
-valExprs: Seq[Expression]) extends Unevaluable {
+valExprs: Seq[Expression],
+sortColumns: Boolean = false) extends Unevaluable {

Review comment:
   Changed `sortColumns` to `sortOutputColumns`. I'm not sure we want to 
hide `sortColumns`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-08-31 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r480527411



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,97 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should 
add `a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+require(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.length == 0) {
+  None
+} else {
+  Some(addFieldsInto(col, "", missingFields.fields))
+}
+  }
+
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+var currCol = col

Review comment:
   looks good. rewritten. thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-08-31 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r480500164



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
##
@@ -103,4 +104,30 @@ class StructTypeSuite extends SparkFunSuite {
 val interval = "`a` INTERVAL"
 assert(fromDDL(interval).toDDL === interval)
   }
+
+  test("find missing (nested) fields") {

Review comment:
   OK, sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-08-31 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r480239099



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala
##
@@ -17,29 +17,97 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, Literal, NamedExpression, WithFields}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Resolves different children of Union to a common set of columns.
  */
 object ResolveUnion extends Rule[LogicalPlan] {
-  private def unionTwoSides(
+  /**
+   * Adds missing fields recursively into given `col` expression, based on the 
target `StructType`.
+   * For example, given `col` as "a struct, b int" and `target` 
as
+   * "a struct, b int, c string", this method should 
add `a.c` and `c` to
+   * `col` expression.
+   */
+  private def addFields(col: NamedExpression, target: StructType): 
Option[Expression] = {
+require(col.dataType.isInstanceOf[StructType], "Only support StructType.")
+
+val resolver = SQLConf.get.resolver
+val missingFields =
+  StructType.findMissingFields(col.dataType.asInstanceOf[StructType], 
target, resolver)
+if (missingFields.length == 0) {
+  None
+} else {
+  Some(addFieldsInto(col, "", missingFields.fields))
+}
+  }
+
+  private def addFieldsInto(col: Expression, base: String, fields: 
Seq[StructField]): Expression = {
+var currCol = col
+fields.foreach { field =>
+  field.dataType match {
+case dt: AtomicType =>
+  // We need to sort columns in result, because we might add another 
column in other side.
+  // E.g., we want to union two structs "a int, b long" and "a int, c 
string".
+  // If we don't sort, we will have "a int, b long, c string" and "a 
int, c string, b long",
+  // which are not compatible.
+  currCol = WithFields(currCol, s"$base${field.name}", Literal(null, 
dt),
+sortColumns = true)
+case st: StructType =>
+  val resolver = SQLConf.get.resolver
+  val colField = currCol.dataType.asInstanceOf[StructType]
+.find(f => resolver(f.name, field.name))
+  if (colField.isEmpty) {
+// The whole struct is missing. Add a null.
+currCol = WithFields(currCol, s"$base${field.name}", Literal(null, 
st),
+  sortColumns = true)
+  } else {
+currCol = addFieldsInto(currCol, s"$base${field.name}.", st.fields)
+  }
+  }
+}
+currCol
+  }
+
+  private def compareAndAddFields(
   left: LogicalPlan,
   right: LogicalPlan,
-  allowMissingCol: Boolean): LogicalPlan = {
+  allowMissingCol: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) 
= {
 val resolver = SQLConf.get.resolver
 val leftOutputAttrs = left.output
 val rightOutputAttrs = right.output
 
-// Builds a project list for `right` based on `left` output names
+val aliased = mutable.ArrayBuffer.empty[Attribute]
+
 val rightProjectList = leftOutputAttrs.map { lattr =>
-  rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) 
}.getOrElse {
+  val found = rightOutputAttrs.find { rattr => resolver(lattr.name, 
rattr.name) }
+  if (found.isDefined) {
+val foundDt = found.get.dataType
+(foundDt, lattr.dataType) match {
+  case (source: StructType, target: StructType)
+  if allowMissingCol && !source.sameType(target) =>
+// Having an output with same name, but different struct type.
+// We need to add missing fields.
+addFields(found.get, target).map { added =>
+  aliased += found.get
+  Alias(added, found.get.name)()
+}.getOrElse(found.get) // Data type doesn't change. We should add 
fields at other side.
+  case _ =>
+// Same struct type, or
+// unsupported: different types, array or map types, or

Review comment:
   Array and map types aren't supported by `WithFields`. I think it is 
still possible to add them to `WithFields`. Once `WithFields` supports these 
types, we can add them here too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure 

[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-08-31 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r480238080



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##
@@ -641,4 +641,30 @@ object StructType extends AbstractDataType {
 fields.foreach(s => map.put(s.name, s))
 map
   }
+
+  /**
+   * Returns a `StructType` that contains missing fields recursively from 
`source` to `target`.
+   * Note that this doesn't support looking into array type and map type 
recursively.

Review comment:
   I leverage `WithFields` to add missing nested fields into structs. 
`WithFields` doesn't support array or map types currently.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns

2020-08-31 Thread GitBox


viirya commented on a change in pull request #29587:
URL: https://github.com/apache/spark/pull/29587#discussion_r480237210



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
##
@@ -641,4 +641,30 @@ object StructType extends AbstractDataType {
 fields.foreach(s => map.put(s.name, s))
 map
   }
+
+  /**
+   * Returns a `StructType` that contains missing fields recursively from 
`source` to `target`.
+   * Note that this doesn't support looking into array type and map type 
recursively.
+   */
+  def findMissingFields(source: StructType, target: StructType, resolver: 
Resolver): StructType = {

Review comment:
   I feel this is more general method related to `StructType`. So putting 
it in `StructType`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org