cloud-fan commented on a change in pull request #25465: [SPARK-28747][SQL] 
merge the two data source v2 fallback configs
URL: https://github.com/apache/spark/pull/25465#discussion_r314392192
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 ##########
 @@ -251,64 +251,62 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
     assertNotBucketed("save")
 
-    val session = df.sparkSession
-    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
-    val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty
-
-    // In Data Source V2 project, partitioning is still under development.
-    // Here we fallback to V1 if partitioning columns are specified.
-    // TODO(SPARK-26778): use V2 implementations when partitioning feature is 
supported.
-    if (canUseV2) {
-      val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
-      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-        provider, session.sessionState.conf)
-      val options = sessionOptions ++ extraOptions
-      val dsOptions = new CaseInsensitiveStringMap(options.asJava)
-
-      import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
-      provider.getTable(dsOptions) match {
-        // TODO (SPARK-27815): To not break existing tests, here we treat file 
source as a special
-        // case, and pass the save mode to file source directly. This hack 
should be removed.
-        case table: FileTable =>
-          val write = 
table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
-            .mode(modeForDSV1) // should not change default mode for file 
source.
-            .withQueryId(UUID.randomUUID().toString)
-            .withInputDataSchema(df.logicalPlan.schema)
-            .buildForBatch()
-          // The returned `Write` can be null, which indicates that we can 
skip writing.
-          if (write != null) {
+    lookupV2Provider() match {
+      // TODO(SPARK-26778): use V2 implementations when partition columns are 
specified
+      case Some(provider) if partitioningColumns.isEmpty =>
+        saveToV2Source(provider)
+
+      case _ => saveToV1Source()
+    }
+  }
+
+  private def saveToV2Source(provider: TableProvider): Unit = {
 
 Review comment:
   You can review it with while space change ignored if you want to double-check
   
![image](https://user-images.githubusercontent.com/3182036/63109968-f1067580-bfbc-11e9-815c-ff1768c07b09.png)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to