Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200231214 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2120,6 +2122,99 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table: NamedRelation, query, isByName) + if table.resolved && query.resolved && !append.resolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + append.copy(query = projection) + } else { + append + } + } + + def resolveOutputColumns( + tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { + expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { + case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + + case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) => + Some(upcast(inAttr, outAttr)) + + case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + + case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } + } + + } else { + if (expected.size > query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + query.output.zip(expected).flatMap { + case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + + case (inAttr, outAttr) + if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name != outAttr.name => + Some(upcast(inAttr, outAttr)) + + case (inAttr, _) => + Some(inAttr) // matches nullability, datatype, and name + } + } + + if (errors.nonEmpty) { + throw new AnalysisException( + s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}") + } + + Project(resolved, query) + } + + private def upcast(inAttr: NamedExpression, outAttr: Attribute): NamedExpression = { + Alias( + UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name --- End diff -- Since Upcast is being used in more places, I wanna raise https://github.com/apache/spark/pull/21586 again. The current Upcast has a bug that it forbids string-to-int, but allows string-to-boolean. Changing it is It's a behavior change so we should make sure the behavior is reasonable. What's our expectation of Upcast here? Are we expecting runtime error when casting string to other types?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org