brkyvz commented on a change in pull request #25507: [SPARK-28667][SQL] Support 
InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317670533
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##########
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
     override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-      case i @ InsertIntoStatement(
-          UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), 
ident)), _, _, _, _)
-          if i.query.resolved =>
-        loadTable(tableCatalog, ident)
-            .map(DataSourceV2Relation.create)
-            .map(relation => {
-              // ifPartitionNotExists is append with validation, but 
validation is not supported
-              if (i.ifPartitionNotExists) {
-                throw new AnalysisException(
-                  s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
-              }
-
-              val partCols = partitionColumnNames(relation.table)
-              validatePartitionSpec(partCols, i.partitionSpec)
+      case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
+        lookupV2Relation(u) match {
+          case scala.Right(Some(v2Table: Table)) =>
+            val relation = DataSourceV2Relation.create(v2Table)
+            // ifPartitionNotExists is append with validation, but validation 
is not supported
+            if (i.ifPartitionNotExists) {
+              throw new AnalysisException(
+                s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
+            }
 
-              val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
-              val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
-              val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
-                  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+            val partCols = partitionColumnNames(relation.table)
+            validatePartitionSpec(partCols, i.partitionSpec)
 
-              if (!i.overwrite) {
-                AppendData.byPosition(relation, query)
-              } else if (dynamicPartitionOverwrite) {
-                OverwritePartitionsDynamic.byPosition(relation, query)
-              } else {
-                OverwriteByExpression.byPosition(
-                  relation, query, staticDeleteExpression(relation, 
staticPartitions))
-              }
-            })
-            .getOrElse(i)
+            val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
+            val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
+            val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
+              conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
 
-      case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), 
_, _, _, _)
-          if i.query.resolved =>
-        InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
+            if (!i.overwrite) {
+              AppendData.byPosition(relation, query)
+            } else if (dynamicPartitionOverwrite) {
+              OverwritePartitionsDynamic.byPosition(relation, query)
+            } else {
+              OverwriteByExpression.byPosition(
+                relation, query, staticDeleteExpression(relation, 
staticPartitions))
+            }
+          case _ =>
+            InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
 
 Review comment:
   I was looking at ALTER TABLE as well, and it seems like `lookupV2Relation` 
will need to support Either in cases where we do have a catalog name. I was 
thinking that:
   ```scala
   Either[(CatalogPlugin, Ident, Option[Table]), Option[(Ident, Table)]] match {
     case Left => // Catalog found, table may or may not be found
     case Right => // Table may be resolved as V2 or left for V1
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to