Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239598346
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     
         assertNotBucketed("save")
     
    -    val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
    -    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
    -      source match {
    -        case provider: BatchWriteSupportProvider =>
    -          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    -            source,
    -            df.sparkSession.sessionState.conf)
    -          val options = sessionOptions ++ extraOptions
    -
    +    val session = df.sparkSession
    +    val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
    +    if (classOf[TableProvider].isAssignableFrom(cls)) {
    +      val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
    +      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    +        provider, session.sessionState.conf)
    +      val options = sessionOptions ++ extraOptions
    +      val dsOptions = new DataSourceOptions(options.asJava)
    +      provider.getTable(dsOptions) match {
    +        case table: SupportsBatchWrite =>
    +          val relation = DataSourceV2Relation.create(table, dsOptions)
    +          // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
    +          // We should create new end-users APIs for the `AppendData` 
operator.
    --- End diff --
    
    Here is what my branch uses for this logic:
    
    ```scala
            val maybeTable = provider.getTable(identifier)
            val exists = maybeTable.isDefined
            (exists, mode) match {
              case (true, SaveMode.ErrorIfExists) =>
                throw new AnalysisException(s"Table already exists: 
${identifier.quotedString}")
    
              case (true, SaveMode.Overwrite) =>
                val relation = DataSourceV2Relation.create(
                  catalog.name, identifier, maybeTable.get, options)
    
                runCommand(df.sparkSession, "insertInto") {
                  OverwritePartitionsDynamic.byName(relation, df.logicalPlan)
                }
    
              case (true, SaveMode.Append) =>
                val relation = DataSourceV2Relation.create(
                  catalog.name, identifier, maybeTable.get, options)
    
                runCommand(df.sparkSession, "save") {
                  AppendData.byName(relation, df.logicalPlan)
                }
    
              case (false, SaveMode.Append) |
                   (false, SaveMode.ErrorIfExists) |
                   (false, SaveMode.Ignore) |
                   (false, SaveMode.Overwrite) =>
    
                runCommand(df.sparkSession, "save") {
                  CreateTableAsSelect(catalog, identifier, Seq.empty, 
df.logicalPlan, options,
                    ignoreIfExists = mode == SaveMode.Ignore)
                }
    
              case _ =>
              // table exists and mode is ignore
            }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to