cloud-fan commented on a change in pull request #29712:
URL: https://github.com/apache/spark/pull/29712#discussion_r486895542



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       `will be merged with the table serde properties as the scan options`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       table serde properties.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       `extraOptionsMap` -> `extraOptions`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `create datasource` -> `resolve data source`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `table.identifier.table`  -> `table.identifier`, so that we include 
database name as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       table serde property

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       `input options` -> `extra options specified for this scan operation.`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       `input options` -> `extra options`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       nit: `lowerCasedDuplicatedKeys`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       `will be merged with the table serde properties as the scan options`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       table serde properties.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       `extraOptionsMap` -> `extraOptions`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `create datasource` -> `resolve data source`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `table.identifier.table`  -> `table.identifier`, so that we include 
database name as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       table serde property

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       `input options` -> `extra options specified for this scan operation.`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       `input options` -> `extra options`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       nit: `lowerCasedDuplicatedKeys`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       `will be merged with the table serde properties as the scan options`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       table serde properties.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       `extraOptionsMap` -> `extraOptions`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `create datasource` -> `resolve data source`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `table.identifier.table`  -> `table.identifier`, so that we include 
database name as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       table serde property

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       `input options` -> `extra options specified for this scan operation.`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       `input options` -> `extra options`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       nit: `lowerCasedDuplicatedKeys`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       `will be merged with the table serde properties as the scan options`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       table serde properties.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       `extraOptionsMap` -> `extraOptions`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `create datasource` -> `resolve data source`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `table.identifier.table`  -> `table.identifier`, so that we include 
database name as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       table serde property

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       `input options` -> `extra options specified for this scan operation.`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       `input options` -> `extra options`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       nit: `lowerCasedDuplicatedKeys`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       `will be merged with the table serde properties as the scan options`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       table serde properties.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       `extraOptionsMap` -> `extraOptions`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `create datasource` -> `resolve data source`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `table.identifier.table`  -> `table.identifier`, so that we include 
database name as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       table serde property

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       `input options` -> `extra options specified for this scan operation.`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       `input options` -> `extra options`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       nit: `lowerCasedDuplicatedKeys`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +
+        "extra options will be merged. Otherwise, an exception will be 
thrown.")

Review comment:
       `will be merged with the table serde properties as the scan options`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+    buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+      .internal()
+      .doc("When true, the extra options will be ignored for 
DataFrameReader.table(). If set it " +
+        "to false, which is the default, Spark will check if the extra options 
have the same " +
+        "key, but the value is different with the table properties. If the 
check passes, the " +

Review comment:
       table serde properties.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {

Review comment:
       `extraOptionsMap` -> `extraOptions`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `create datasource` -> `resolve data source`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +

Review comment:
       `table.identifier.table`  -> `table.identifier`, so that we include 
database name as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       table serde property

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +

Review comment:
       `input options` -> `extra options specified for this scan operation.`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +

Review comment:
       `input options` -> `extra options`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
     case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
     case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
+
+  def generateDatasourceOptions(
+      extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable): 
Map[String, String] = {
+    val pathOption = table.storage.locationUri.map("path" -> 
CatalogUtils.URIToString(_))
+    val options = table.storage.properties ++ pathOption
+    if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+      // Check the same key with different values
+      table.storage.properties.foreach { case (k, v) =>
+        if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+          throw new AnalysisException(
+            s"Fail to create datasource for the table 
${table.identifier.table} since the table " +
+              s"property has the duplicated key $k with input options. To fix 
this, you can " +
+              "rollback to the legacy behavior of ignoring the input options 
by setting the " +
+              s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to 
`false`, or address the " +
+              s"conflicts of the same config.")
+        }
+      }
+      // To keep the original key from table properties, here we filter all 
case insensitive
+      // duplicate keys out from extra options.
+      val caseInsensitiveDuplicateKeys =

Review comment:
       nit: `lowerCasedDuplicatedKeys`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to