Repository: spark
Updated Branches:
  refs/heads/branch-2.4 9031c7848 -> a9a8d3a4b


[SPARK-25425][SQL][BACKPORT-2.4] Extra options should override session options 
in DataSource V2

## What changes were proposed in this pull request?

In the PR, I propose overriding session options by extra options in DataSource 
V2. Extra options are more specific and set via `.option()`, and should 
overwrite more generic session options.

## How was this patch tested?

Added tests for read and write paths.

Closes #22474 from MaxGekk/session-options-2.4.

Authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9a8d3a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9a8d3a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9a8d3a4

Branch: refs/heads/branch-2.4
Commit: a9a8d3a4b92be89defd82d5f2eeb3f9af45c687d
Parents: 9031c78
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Wed Sep 19 16:53:26 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Wed Sep 19 16:53:26 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  8 +++--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 33 ++++++++++++++++++++
 .../sources/v2/SimpleWritableDataSource.scala   |  7 ++++-
 4 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 371ec70..27a1af2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
           DataSourceOptions.PATHS_KEY -> 
objectMapper.writeValueAsString(paths.toArray)
         }
         Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
-          ds, extraOptions.toMap ++ sessionOptions + pathsOption,
+          ds, sessionOptions ++ extraOptions.toMap + pathsOption,
           userSpecifiedSchema = userSpecifiedSchema))
       } else {
         loadV1Source(paths: _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4aeddfd..80ade7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       val source = cls.newInstance().asInstanceOf[DataSourceV2]
       source match {
         case ws: WriteSupport =>
-          val options = extraOptions ++
-              DataSourceV2Utils.extractSessionConfigs(source, 
df.sparkSession.sessionState.conf)
+          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+            source,
+            df.sparkSession.sessionState.conf)
+          val options = sessionOptions ++ extraOptions
+          val relation = DataSourceV2Relation.create(source, options)
 
-          val relation = DataSourceV2Relation.create(source, options.toMap)
           if (mode == SaveMode.Append) {
             runCommand(df.sparkSession, "save") {
               AppendData.byName(relation, df.logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 12beca2..bafde50 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.sources.v2
 
+import java.io.File
 import java.util.{ArrayList, List => JList}
 
 import test.org.apache.spark.sql.sources.v2._
@@ -322,6 +323,38 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
     checkCanonicalizedOutput(df, 2, 2)
     checkCanonicalizedOutput(df.select('i), 2, 1)
   }
+
+  test("SPARK-25425: extra options should override sessions options during 
reading") {
+    val prefix = "spark.datasource.userDefinedDataSource."
+    val optionName = "optionA"
+    withSQLConf(prefix + optionName -> "true") {
+      val df = spark
+        .read
+        .option(optionName, false)
+        .format(classOf[DataSourceV2WithSessionConfig].getName).load()
+      val options = df.queryExecution.optimizedPlan.collectFirst {
+        case d: DataSourceV2Relation => d.options
+      }
+      assert(options.get.get(optionName) == Some("false"))
+    }
+  }
+
+  test("SPARK-25425: extra options should override sessions options during 
writing") {
+    withTempPath { path =>
+      val sessionPath = path.getCanonicalPath
+      withSQLConf("spark.datasource.simpleWritableDataSource.path" -> 
sessionPath) {
+        withTempPath { file =>
+          val optionPath = file.getCanonicalPath
+          val format = classOf[SimpleWritableDataSource].getName
+
+          val df = Seq((1L, 2L)).toDF("i", "j")
+          df.write.format(format).option("path", optionPath).save()
+          assert(!new File(sessionPath).exists)
+          checkAnswer(spark.read.format(format).option("path", 
optionPath).load(), df)
+        }
+      }
+    }
+  }
 }
 
 class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {

http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index e1b8e9c..654c62d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration
  * Each task writes data to 
`target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
  * Each job moves files from `target/_temporary/jobId/` to `target`.
  */
-class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with 
WriteSupport {
+class SimpleWritableDataSource extends DataSourceV2
+  with ReadSupport
+  with WriteSupport
+  with SessionConfigSupport {
 
   private val schema = new StructType().add("i", "long").add("j", "long")
 
+  override def keyPrefix: String = "simpleWritableDataSource"
+
   class Reader(path: String, conf: Configuration) extends DataSourceReader {
     override def readSchema(): StructType = schema
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to