Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239598346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) - if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { - case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - source, - df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - + val session = df.sparkSession + val cls = DataSource.lookupDataSource(source, session.sessionState.conf) + if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { + case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) + // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. + // We should create new end-users APIs for the `AppendData` operator. --- End diff -- Here is what my branch uses for this logic: ```scala val maybeTable = provider.getTable(identifier) val exists = maybeTable.isDefined (exists, mode) match { case (true, SaveMode.ErrorIfExists) => throw new AnalysisException(s"Table already exists: ${identifier.quotedString}") case (true, SaveMode.Overwrite) => val relation = DataSourceV2Relation.create( catalog.name, identifier, maybeTable.get, options) runCommand(df.sparkSession, "insertInto") { OverwritePartitionsDynamic.byName(relation, df.logicalPlan) } case (true, SaveMode.Append) => val relation = DataSourceV2Relation.create( catalog.name, identifier, maybeTable.get, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } case (false, SaveMode.Append) | (false, SaveMode.ErrorIfExists) | (false, SaveMode.Ignore) | (false, SaveMode.Overwrite) => runCommand(df.sparkSession, "save") { CreateTableAsSelect(catalog, identifier, Seq.empty, df.logicalPlan, options, ignoreIfExists = mode == SaveMode.Ignore) } case _ => // table exists and mode is ignore } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org