Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/12239#discussion_r60611545
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -414,8 +414,42 @@ class Analyzer(
}
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
- i.copy(table = EliminateSubqueryAliases(getTable(u)))
+ case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _)
if child.resolved =>
+ val table = getTable(u)
+ // adding the table's partitions or validate the query's partition
info
+ table match {
+ case relation: PartitionedRelation if
relation.partitionColumns.nonEmpty =>
+ val tablePartitionNames = relation.partitionColumns.map(_.name)
+ if (parts.keys.nonEmpty) {
+ // the query's partitioning must match the table's
partitioning
+ // this is set for queries like: insert into ... partition
(one = "a", two = <expr>)
+ if (tablePartitionNames.size != parts.keySet.size) {
+ throw new AnalysisException(
+ s"""Requested partitioning does not match the
${u.tableIdentifier} table:
+ |Requested partitions: ${parts.keys.mkString(",")}
+ |Table partitions:
${tablePartitionNames.mkString(",")}""".stripMargin)
+ }
+ // assumes partition columns are correctly placed at the end
of the child's output
+ i.copy(table = EliminateSubqueryAliases(table))
+ } else {
+ // Set up the table's partition scheme with all dynamic
partitions by moving partition
+ // columns to the end of the column list, in partition order.
+ val (inputPartCols, columns) = child.output.partition { attr
=>
+ tablePartitionNames.contains(attr.name)
+ }
+ // All partition columns are dynamic because this
InsertIntoTable had no partitioning
+ val partColumns = tablePartitionNames.map { name =>
--- End diff --
This does two things:
# Reorders the partition columns to match the table. There's a test case
for this and it catches a small bug where the wrong data column is used when
the input partition columns don't match the order of the table's partitioning.
# If the column is missing from the data to be written, this will throw an
exception.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]