cloud-fan commented on a change in pull request #25465: [SPARK-28747][SQL]
merge the two data source v2 fallback configs
URL: https://github.com/apache/spark/pull/25465#discussion_r315962976
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
##########
@@ -251,64 +251,62 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
assertNotBucketed("save")
- val session = df.sparkSession
- val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
- val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty
-
- // In Data Source V2 project, partitioning is still under development.
- // Here we fallback to V1 if partitioning columns are specified.
- // TODO(SPARK-26778): use V2 implementations when partitioning feature is
supported.
- if (canUseV2) {
- val provider =
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
- val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- provider, session.sessionState.conf)
- val options = sessionOptions ++ extraOptions
- val dsOptions = new CaseInsensitiveStringMap(options.asJava)
-
- import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
- provider.getTable(dsOptions) match {
- // TODO (SPARK-27815): To not break existing tests, here we treat file
source as a special
- // case, and pass the save mode to file source directly. This hack
should be removed.
- case table: FileTable =>
- val write =
table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
- .mode(modeForDSV1) // should not change default mode for file
source.
- .withQueryId(UUID.randomUUID().toString)
- .withInputDataSchema(df.logicalPlan.schema)
- .buildForBatch()
- // The returned `Write` can be null, which indicates that we can
skip writing.
- if (write != null) {
+ lookupV2Provider() match {
+ // TODO(SPARK-26778): use V2 implementations when partition columns are
specified
+ case Some(provider) if partitioningColumns.isEmpty =>
+ saveToV2Source(provider)
+
+ case _ => saveToV1Source()
+ }
+ }
+
+ private def saveToV2Source(provider: TableProvider): Unit = {
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ provider, df.sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dsOptions = new CaseInsensitiveStringMap(options.asJava)
+
+ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+ provider.getTable(dsOptions) match {
+ // TODO (SPARK-27815): To not break existing tests, here we treat file
source as a special
+ // case, and pass the save mode to file source directly. This hack
should be removed.
+ case table: FileTable =>
Review comment:
good point!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]