uros-b commented on code in PR #56617:
URL: https://github.com/apache/spark/pull/56617#discussion_r3466225890


##########
sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala:
##########
@@ -168,66 +168,90 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) extends sql.DataFram
 
       import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
       val catalogManager = df.sparkSession.sessionState.catalogManager
+
+      def createTableAsSelectCommand(
+          catalog: TableCatalog, ident: Identifier, ignoreIfExists: Boolean): 
LogicalPlan = {
+        val tableSpec = UnresolvedTableSpec(
+          properties = Map.empty,
+          provider = Some(source),
+          optionExpression = OptionList(Seq.empty),
+          location = extraOptions.get("path"),
+          comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+          collation = extraOptions.get(TableCatalog.PROP_COLLATION),
+          serde = None,
+          external = false,
+          constraints = Seq.empty)
+        CreateTableAsSelect(
+          UnresolvedIdentifier(
+            catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name),
+          partitioningAsV2,
+          df.queryExecution.analyzed,
+          tableSpec,
+          finalOptions,
+          ignoreIfExists = ignoreIfExists)
+      }
+
+      def appendOrOverwriteCommand(
+          table: Table,
+          catalog: Option[CatalogPlugin],
+          ident: Option[Identifier]): LogicalPlan = {
+        checkPartitioningMatchesV2Table(table)
+        val relation = DataSourceV2Relation.create(table, catalog, ident, 
dsOptions)
+        if (curmode == SaveMode.Append) {
+          AppendData.byName(relation, df.logicalPlan, finalOptions, 
_withSchemaEvolution)
+        } else {
+          // Truncate the table. TableCapabilityCheck will throw a nice 
exception if this
+          // isn't supported
+          OverwriteByExpression.byName(
+            relation, df.logicalPlan, Literal(true), finalOptions, 
_withSchemaEvolution)
+        }
+      }
+
       curmode match {
         case SaveMode.Append | SaveMode.Overwrite =>
-          val (table, catalog, ident) = provider match {
-            case supportsExtract: SupportsCatalogOptions =>
+          provider match {
+            case supportsExtract: SupportsCatalogOptions
+                if supportsExtract.useCatalogResolution(dsOptions) =>
               val ident = supportsExtract.extractIdentifier(dsOptions)
               val catalog = CatalogV2Util.getTableProviderCatalog(
                 supportsExtract, catalogManager, dsOptions)
-
-              (catalog.loadTable(ident), Some(catalog), Some(ident))
+              val tableOpt =
+                try Some(catalog.loadTable(ident))

Review Comment:
   Question: are we using loadTable(ident) without write privileges here? The 
new Append/Overwrite branch calls catalog.loadTable(ident) without passing 
getWritePrivileges, unlike saveAsTable which uses loadTable(ident, 
getWritePrivileges).
   
   I see that this is a pre-existing issue, but is write authorization properly 
enforced here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to