xuanyuanking commented on a change in pull request #29712:
URL: https://github.com/apache/spark/pull/29712#discussion_r486890695



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -251,24 +255,25 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
           partitionColumns = table.partitionColumnNames,
           bucketSpec = table.bucketSpec,
           className = table.provider.get,
-          options = table.storage.properties ++ pathOption,
+          options = extraOptions.asCaseSensitiveMap.asScala.toMap
+            ++ table.storage.properties ++ pathOption,

Review comment:
       Thanks, done in cc250b9.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +192,15 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  private[sql] def checkDuplicateOptions(

Review comment:
       Ah yeah, thanks for reminding, done in cc250b9.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
##########
@@ -1190,4 +1190,24 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
     verifyLoadFails(df.write.option("path", path).format("parquet").save(path))
     verifyLoadFails(df.write.option("path", path).format("parquet").save(""))
   }
+
+  test("DataFrameReader.table take the specified options for V1 relation") {

Review comment:
       Sure, done in cc250b9.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       Thanks, done in 05aa245.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       Thanks, done in 05aa245.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       Thanks, done in f3f8c8d.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       Copy, rephased in f3f8c8d.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       Done in f3f8c8d.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       Rephased in f3f8c8d.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       Thanks, done in f3f8c8d.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to