Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15995#discussion_r90088361
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
    @@ -129,65 +129,67 @@ case class DataSourceAnalysis(conf: CatalystConf) 
extends Rule[LogicalPlan] {
       }
     
       override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    -    // If the InsertIntoTable command is for a partitioned 
HadoopFsRelation and
    -    // the user has specified static partitions, we add a Project operator 
on top of the query
    -    // to include those constant column values in the query result.
    -    //
    -    // Example:
    -    // Let's say that we have a table "t", which is created by
    -    // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY 
(b, c)
    -    // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 
3"
    -    // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 
1, 2, 3".
    -    //
    -    // Basically, we will put those partition columns having a assigned 
value back
    -    // to the SELECT clause. The output of the SELECT clause is organized 
as
    -    // normal_columns static_partitioning_columns 
dynamic_partitioning_columns.
    -    // static_partitioning_columns are partitioning columns having assigned
    -    // values in the PARTITION clause (e.g. b in the above example).
    -    // dynamic_partitioning_columns are partitioning columns that do not 
assigned
    -    // values in the PARTITION clause (e.g. c in the above example).
         case insert @ logical.InsertIntoTable(
    -      relation @ LogicalRelation(t: HadoopFsRelation, _, _), parts, query, 
overwrite, false)
    -      if query.resolved && parts.exists(_._2.isDefined) =>
    -
    -      val projectList = convertStaticPartitions(
    -        sourceAttributes = query.output,
    -        providedPartitions = parts,
    -        targetAttributes = relation.output,
    -        targetPartitionSchema = t.partitionSchema)
    -
    -      // We will remove all assigned values to static partitions because 
they have been
    -      // moved to the projectList.
    -      insert.copy(partition = parts.map(p => (p._1, None)), child = 
Project(projectList, query))
    -
    +      l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, 
overwrite, false)
    +      if query.resolved =>
    +
    +      // If the InsertIntoTable command is for a partitioned 
HadoopFsRelation and
    +      // the user has specified static partitions, we add a Project 
operator on top of the query
    +      // to include those constant column values in the query result.
    +      //
    +      // Example:
    +      // Let's say that we have a table "t", which is created by
    +      // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY 
(b, c)
    +      // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 
1, 3"
    +      // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 
1, 2, 3".
    +      //
    +      // Basically, we will put those partition columns having a assigned 
value back
    +      // to the SELECT clause. The output of the SELECT clause is 
organized as
    +      // normal_columns static_partitioning_columns 
dynamic_partitioning_columns.
    +      // static_partitioning_columns are partitioning columns having 
assigned
    +      // values in the PARTITION clause (e.g. b in the above example).
    +      // dynamic_partitioning_columns are partitioning columns that do not 
assigned
    +      // values in the PARTITION clause (e.g. c in the above example).
    +      val actualQuery = if (parts.exists(_._2.isDefined)) {
    +        val projectList = convertStaticPartitions(
    +          sourceAttributes = query.output,
    +          providedPartitions = parts,
    +          targetAttributes = l.output,
    +          targetPartitionSchema = t.partitionSchema)
    +        Project(projectList, query)
    +      } else {
    +        query
    +      }
     
    -    case i @ logical.InsertIntoTable(
    -           l @ LogicalRelation(t: HadoopFsRelation, _, table), part, 
query, overwrite, false)
    -        if query.resolved && t.schema.asNullable == 
query.schema.asNullable =>
    +      // Sanity check, this should be guaranteed by 
`PreprocessTableInsertion`
    +      if (t.schema.asNullable != actualQuery.schema.asNullable) {
    +        throw new AnalysisException(
    +          s"Can not insert into ${l.simpleString} because of schema 
mismatch.")
    --- End diff --
    
    This seems to be triggered in the tests. Is it because the process rule no 
longer gets a chance to run now that the static resolution is combined with 
this rule?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to