rdblue commented on a change in pull request #24832: [SPARK-27845][SQL]
DataSourceV2: InsertTable
URL: https://github.com/apache/spark/pull/24832#discussion_r300789916
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
##########
@@ -118,6 +122,95 @@ case class DataSourceResolution(
if newColumns.forall(_.name.size == 1) =>
// only top-level adds are supported using AlterTableAddColumnsCommand
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))
+
+ case i @
InsertTableStatement(UnresolvedRelation(CatalogObjectIdentifier(Some(catalog),
ident)),
+ _, _, _, _) if i.query.resolved =>
+ loadTable(catalog, ident)
+ .map(DataSourceV2Relation.create)
+ .map(table => {
+ // ifPartitionNotExists is append with validation, but validation is
not supported
+ if (i.ifPartitionNotExists) {
+ throw new AnalysisException(
+ s"Cannot write, IF NOT EXISTS is not supported for table:
${table.table.name}")
+ }
+
+ val staticPartitions =
i.partition.filter(_._2.isDefined).mapValues(_.get)
+
+ val resolver = conf.resolver
+
+ // add any static value as a literal column
+ val staticPartitionProjectList = {
+ // check that the data column counts match
+ val numColumns = table.output.size
+ if (numColumns > staticPartitions.size + i.query.output.size) {
+ throw new AnalysisException(s"Cannot write: too many columns")
+ } else if (numColumns < staticPartitions.size +
i.query.output.size) {
+ throw new AnalysisException(s"Cannot write: not enough columns")
+ }
+
+ val staticNames = staticPartitions.keySet
+
+ // for each static name, find the column name it will replace and
check for unknowns.
+ val outputNameToStaticName = staticNames.map(staticName =>
+ table.output.find(col => resolver(col.name, staticName)) match {
+ case Some(attr) =>
+ attr.name -> staticName
+ case _ =>
+ throw new AnalysisException(
+ s"Cannot add static value for unknown column: $staticName")
+ }).toMap
+
+ // for each output column, add the static value as a literal
+ // or use the next input column
+ val queryColumns = i.query.output.iterator
+ table.output.map { col =>
+
outputNameToStaticName.get(col.name).flatMap(staticPartitions.get) match {
+ case Some(staticValue) =>
+ Alias(Cast(Literal(staticValue), col.dataType), col.name)()
+ case _ =>
+ queryColumns.next
+ }
+ }
+ }
+
+ val dynamicPartitionOverwrite = table.table.partitioning.size > 0 &&
+ staticPartitions.size < table.table.partitioning.size &&
+ conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+
+ val query =
+ if (staticPartitions.isEmpty) {
+ i.query
+ } else {
+ Project(staticPartitionProjectList, i.query)
+ }
+
+ val deleteExpr =
+ if (staticPartitions.isEmpty) {
+ Literal(true)
+ } else {
+ staticPartitions.map { case (name, value) =>
+ query.output.find(col => resolver(col.name, name)) match {
+ case Some(attr) =>
+ EqualTo(attr, Cast(Literal(value), attr.dataType))
+ case None =>
+ throw new AnalysisException(s"Unknown static partition
column: $name")
+ }
+ }.toSeq.reduce(And)
+ }
+
+ if (!i.overwrite) {
+ AppendData.byPosition(table, query)
+ } else if (dynamicPartitionOverwrite) {
+ OverwritePartitionsDynamic.byPosition(table, query)
+ } else {
+ OverwriteByExpression.byPosition(table, query, deleteExpr)
+ }
+ })
+ .getOrElse(i)
+
+ case i @ InsertTableStatement(UnresolvedRelation(AsTableIdentifier(_)), _,
_, _, _)
Review comment:
This also needs to handle the case where the identifier is owned by the
session catalog (no default catalog, no catalog in the identifier), but the
table provider is a v2 implementation. To do that, I think this should load the
table using the v2 session catalog, then check the table instance. If it is a
`CatalogTableAsV2`, then convert to the old `InsertIntoTable` like this rule
currently does. Otherwise, it should convert to a v2 plan because it has a v2
table implementation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]