uros-b commented on code in PR #56617:
URL: https://github.com/apache/spark/pull/56617#discussion_r3466225890
##########
sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala:
##########
@@ -168,66 +168,90 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) extends sql.DataFram
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val catalogManager = df.sparkSession.sessionState.catalogManager
+
+ def createTableAsSelectCommand(
+ catalog: TableCatalog, ident: Identifier, ignoreIfExists: Boolean):
LogicalPlan = {
+ val tableSpec = UnresolvedTableSpec(
+ properties = Map.empty,
+ provider = Some(source),
+ optionExpression = OptionList(Seq.empty),
+ location = extraOptions.get("path"),
+ comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+ collation = extraOptions.get(TableCatalog.PROP_COLLATION),
+ serde = None,
+ external = false,
+ constraints = Seq.empty)
+ CreateTableAsSelect(
+ UnresolvedIdentifier(
+ catalog.name +: ident.namespace.toImmutableArraySeq :+ ident.name),
+ partitioningAsV2,
+ df.queryExecution.analyzed,
+ tableSpec,
+ finalOptions,
+ ignoreIfExists = ignoreIfExists)
+ }
+
+ def appendOrOverwriteCommand(
+ table: Table,
+ catalog: Option[CatalogPlugin],
+ ident: Option[Identifier]): LogicalPlan = {
+ checkPartitioningMatchesV2Table(table)
+ val relation = DataSourceV2Relation.create(table, catalog, ident,
dsOptions)
+ if (curmode == SaveMode.Append) {
+ AppendData.byName(relation, df.logicalPlan, finalOptions,
_withSchemaEvolution)
+ } else {
+ // Truncate the table. TableCapabilityCheck will throw a nice
exception if this
+ // isn't supported
+ OverwriteByExpression.byName(
+ relation, df.logicalPlan, Literal(true), finalOptions,
_withSchemaEvolution)
+ }
+ }
+
curmode match {
case SaveMode.Append | SaveMode.Overwrite =>
- val (table, catalog, ident) = provider match {
- case supportsExtract: SupportsCatalogOptions =>
+ provider match {
+ case supportsExtract: SupportsCatalogOptions
+ if supportsExtract.useCatalogResolution(dsOptions) =>
val ident = supportsExtract.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
supportsExtract, catalogManager, dsOptions)
-
- (catalog.loadTable(ident), Some(catalog), Some(ident))
+ val tableOpt =
+ try Some(catalog.loadTable(ident))
Review Comment:
Question: are we using loadTable(ident) without write privileges here? The
new Append/Overwrite branch calls catalog.loadTable(ident) without passing
getWritePrivileges, unlike saveAsTable which uses loadTable(ident,
getWritePrivileges).
I see that this is a pre-existing issue, but is write authorization properly
enforced here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]