rdblue commented on a change in pull request #25465: [SPARK-28747][SQL] merge 
the two data source v2 fallback configs
URL: https://github.com/apache/spark/pull/25465#discussion_r315814833
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 ##########
 @@ -251,64 +251,62 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
     assertNotBucketed("save")
 
-    val session = df.sparkSession
-    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
-    val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty
-
-    // In Data Source V2 project, partitioning is still under development.
-    // Here we fallback to V1 if partitioning columns are specified.
-    // TODO(SPARK-26778): use V2 implementations when partitioning feature is 
supported.
-    if (canUseV2) {
-      val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
-      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-        provider, session.sessionState.conf)
-      val options = sessionOptions ++ extraOptions
-      val dsOptions = new CaseInsensitiveStringMap(options.asJava)
-
-      import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
-      provider.getTable(dsOptions) match {
-        // TODO (SPARK-27815): To not break existing tests, here we treat file 
source as a special
-        // case, and pass the save mode to file source directly. This hack 
should be removed.
-        case table: FileTable =>
-          val write = 
table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
-            .mode(modeForDSV1) // should not change default mode for file 
source.
-            .withQueryId(UUID.randomUUID().toString)
-            .withInputDataSchema(df.logicalPlan.schema)
-            .buildForBatch()
-          // The returned `Write` can be null, which indicates that we can 
skip writing.
-          if (write != null) {
+    lookupV2Provider() match {
+      // TODO(SPARK-26778): use V2 implementations when partition columns are 
specified
+      case Some(provider) if partitioningColumns.isEmpty =>
+        saveToV2Source(provider)
+
+      case _ => saveToV1Source()
+    }
+  }
+
+  private def saveToV2Source(provider: TableProvider): Unit = {
+    val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+      provider, df.sparkSession.sessionState.conf)
+    val options = sessionOptions ++ extraOptions
+    val dsOptions = new CaseInsensitiveStringMap(options.asJava)
+
+    import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+    provider.getTable(dsOptions) match {
+      // TODO (SPARK-27815): To not break existing tests, here we treat file 
source as a special
+      // case, and pass the save mode to file source directly. This hack 
should be removed.
+      case table: FileTable =>
+        val write = 
table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
+          .mode(modeForDSV1) // should not change default mode for file source.
+          .withQueryId(UUID.randomUUID().toString)
+          .withInputDataSchema(df.logicalPlan.schema)
+          .buildForBatch()
+        // The returned `Write` can be null, which indicates that we can skip 
writing.
+        if (write != null) {
+          runCommand(df.sparkSession, "save") {
+            WriteToDataSourceV2(write, df.logicalPlan)
+          }
+        }
+
+      case table: SupportsWrite if table.supports(BATCH_WRITE) =>
+        lazy val relation = DataSourceV2Relation.create(table, dsOptions)
+        modeForDSV2 match {
+          case SaveMode.Append =>
             runCommand(df.sparkSession, "save") {
-              WriteToDataSourceV2(write, df.logicalPlan)
+              AppendData.byName(relation, df.logicalPlan)
             }
-          }
 
-        case table: SupportsWrite if table.supports(BATCH_WRITE) =>
-          lazy val relation = DataSourceV2Relation.create(table, dsOptions)
-          modeForDSV2 match {
-            case SaveMode.Append =>
-              runCommand(df.sparkSession, "save") {
-                AppendData.byName(relation, df.logicalPlan)
-              }
-
-            case SaveMode.Overwrite if table.supportsAny(TRUNCATE, 
OVERWRITE_BY_FILTER) =>
-              // truncate the table
-              runCommand(df.sparkSession, "save") {
-                OverwriteByExpression.byName(relation, df.logicalPlan, 
Literal(true))
-              }
-
-            case other =>
-              throw new AnalysisException(s"TableProvider implementation 
$source cannot be " +
-                s"written with $other mode, please use Append or Overwrite " +
-                "modes instead.")
-          }
+          case SaveMode.Overwrite if table.supportsAny(TRUNCATE, 
OVERWRITE_BY_FILTER) =>
+            // truncate the table
+            runCommand(df.sparkSession, "save") {
+              OverwriteByExpression.byName(relation, df.logicalPlan, 
Literal(true))
+            }
 
-        // Streaming also uses the data source V2 API. So it may be that the 
data source implements
-        // v2, but has no v2 implementation for batch writes. In that case, we 
fall back to saving
-        // as though it's a V1 source.
-        case _ => saveToV1Source()
-      }
-    } else {
-      saveToV1Source()
+          case other =>
+            throw new AnalysisException(s"TableProvider implementation $source 
cannot be " +
+              s"written with $other mode, please use Append or Overwrite " +
+              "modes instead.")
+        }
+
+      // Streaming also uses the data source V2 API. So it may be that the 
data source
+      // implements v2, but has no v2 implementation for batch writes. In that 
case, we fall
+      // back to saving as though it's a V1 source.
 
 Review comment:
   It doesn't look like this is a required change. It just changed the comment 
indentation and rewrapped. Can you remove this change to avoid future git 
conflicts?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to