Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824639
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if 
!outAttr.dataType.sameType(inAttr.dataType) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column 
'${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    --- End diff --
    
    That check is the other direction: not enough columns.
    
    When matching by position, we need to have the same number of columns so we 
add this check (we already know that there aren't too few columns, so this 
checks for too many). When matching by name, we can call out specific columns 
that are missing, which is why we do the validation differently for the two 
cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to