Github user ericl commented on a diff in the pull request:
https://github.com/apache/spark/pull/15995#discussion_r90088361
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
---
@@ -129,65 +129,67 @@ case class DataSourceAnalysis(conf: CatalystConf)
extends Rule[LogicalPlan] {
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // If the InsertIntoTable command is for a partitioned
HadoopFsRelation and
- // the user has specified static partitions, we add a Project operator
on top of the query
- // to include those constant column values in the query result.
- //
- // Example:
- // Let's say that we have a table "t", which is created by
- // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY
(b, c)
- // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1,
3"
- // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT
1, 2, 3".
- //
- // Basically, we will put those partition columns having a assigned
value back
- // to the SELECT clause. The output of the SELECT clause is organized
as
- // normal_columns static_partitioning_columns
dynamic_partitioning_columns.
- // static_partitioning_columns are partitioning columns having assigned
- // values in the PARTITION clause (e.g. b in the above example).
- // dynamic_partitioning_columns are partitioning columns that do not
assigned
- // values in the PARTITION clause (e.g. c in the above example).
case insert @ logical.InsertIntoTable(
- relation @ LogicalRelation(t: HadoopFsRelation, _, _), parts, query,
overwrite, false)
- if query.resolved && parts.exists(_._2.isDefined) =>
-
- val projectList = convertStaticPartitions(
- sourceAttributes = query.output,
- providedPartitions = parts,
- targetAttributes = relation.output,
- targetPartitionSchema = t.partitionSchema)
-
- // We will remove all assigned values to static partitions because
they have been
- // moved to the projectList.
- insert.copy(partition = parts.map(p => (p._1, None)), child =
Project(projectList, query))
-
+ l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query,
overwrite, false)
+ if query.resolved =>
+
+ // If the InsertIntoTable command is for a partitioned
HadoopFsRelation and
+ // the user has specified static partitions, we add a Project
operator on top of the query
+ // to include those constant column values in the query result.
+ //
+ // Example:
+ // Let's say that we have a table "t", which is created by
+ // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY
(b, c)
+ // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT
1, 3"
+ // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT
1, 2, 3".
+ //
+ // Basically, we will put those partition columns having a assigned
value back
+ // to the SELECT clause. The output of the SELECT clause is
organized as
+ // normal_columns static_partitioning_columns
dynamic_partitioning_columns.
+ // static_partitioning_columns are partitioning columns having
assigned
+ // values in the PARTITION clause (e.g. b in the above example).
+ // dynamic_partitioning_columns are partitioning columns that do not
assigned
+ // values in the PARTITION clause (e.g. c in the above example).
+ val actualQuery = if (parts.exists(_._2.isDefined)) {
+ val projectList = convertStaticPartitions(
+ sourceAttributes = query.output,
+ providedPartitions = parts,
+ targetAttributes = l.output,
+ targetPartitionSchema = t.partitionSchema)
+ Project(projectList, query)
+ } else {
+ query
+ }
- case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation, _, table), part,
query, overwrite, false)
- if query.resolved && t.schema.asNullable ==
query.schema.asNullable =>
+ // Sanity check, this should be guaranteed by
`PreprocessTableInsertion`
+ if (t.schema.asNullable != actualQuery.schema.asNullable) {
+ throw new AnalysisException(
+ s"Can not insert into ${l.simpleString} because of schema
mismatch.")
--- End diff --
This seems to be triggered in the tests. Is it because the process rule no
longer gets a chance to run now that the static resolution is combined with
this rule?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]