Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21305#discussion_r200423206
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -2120,6 +2122,99 @@ class Analyzer(
}
}
+ /**
+ * Resolves columns of an output table from the data in a logical plan.
This rule will:
+ *
+ * - Reorder columns when the write is by name
+ * - Insert safe casts when data types do not match
+ * - Insert aliases when column names do not match
+ * - Detect plans that are not compatible with the output table and
throw AnalysisException
+ */
+ object ResolveOutputRelation extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case append @ AppendData(table: NamedRelation, query, isByName)
+ if table.resolved && query.resolved && !append.resolved =>
+ val projection = resolveOutputColumns(table.name, table.output,
query, isByName)
+
+ if (projection != query) {
+ append.copy(query = projection)
+ } else {
+ append
+ }
+ }
+
+ def resolveOutputColumns(
+ tableName: String,
+ expected: Seq[Attribute],
+ query: LogicalPlan,
+ byName: Boolean): LogicalPlan = {
+
+ if (expected.size < query.output.size) {
+ throw new AnalysisException(
+ s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(",
")}""".stripMargin)
+ }
+
+ val errors = new mutable.ArrayBuffer[String]()
+ val resolved: Seq[NamedExpression] = if (byName) {
+ expected.flatMap { outAttr =>
+ query.resolveQuoted(outAttr.name, resolver) match {
+ case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+ errors += s"Cannot write nullable values to non-null column
'${outAttr.name}'"
+ None
+
+ case Some(inAttr) if
!outAttr.dataType.sameType(inAttr.dataType) =>
+ Some(upcast(inAttr, outAttr))
+
+ case Some(inAttr) =>
+ Some(inAttr) // matches nullability, datatype, and name
+
+ case _ =>
+ errors += s"Cannot find data for output column
'${outAttr.name}'"
+ None
+ }
+ }
+
+ } else {
+ if (expected.size > query.output.size) {
+ throw new AnalysisException(
+ s"""Cannot write to '$tableName', not enough data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(",
")}""".stripMargin)
+ }
+
+ query.output.zip(expected).flatMap {
+ case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
+ errors += s"Cannot write nullable values to non-null column
'${outAttr.name}'"
+ None
+
+ case (inAttr, outAttr)
+ if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name
!= outAttr.name =>
+ Some(upcast(inAttr, outAttr))
+
+ case (inAttr, _) =>
+ Some(inAttr) // matches nullability, datatype, and name
+ }
+ }
+
+ if (errors.nonEmpty) {
+ throw new AnalysisException(
+ s"Cannot write incompatible data to table '$tableName':\n-
${errors.mkString("\n- ")}")
+ }
+
+ Project(resolved, query)
+ }
+
+ private def upcast(inAttr: NamedExpression, outAttr: Attribute):
NamedExpression = {
+ Alias(
+ UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name
--- End diff --
The purpose of `UpCast` here is to prevent Spark from automatically
inserting casts that could lose information, like `long` to `int` or `string`
to `int`.
I would support the same for `string` to `boolean` to catch destructive
problems from accidental column alignment (in SQL) or similar errors. The main
problem here is that Spark inserts casts instead of alerting the user that
there's a problem. When the write succeeds, it may be a while before the user
realizes the mistake and can't recover the original data.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]