cloud-fan commented on a change in pull request #29712:
URL: https://github.com/apache/spark/pull/29712#discussion_r486895542
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
+ "extra options will be merged. Otherwise, an exception will be
thrown.")
Review comment:
`will be merged with the table serde properties as the scan options`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
Review comment:
table serde properties.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
Review comment:
`extraOptionsMap` -> `extraOptions`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`create datasource` -> `resolve data source`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`table.identifier.table` -> `table.identifier`, so that we include
database name as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
table serde property
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
`input options` -> `extra options specified for this scan operation.`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
Review comment:
`input options` -> `extra options`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
+ s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to
`false`, or address the " +
+ s"conflicts of the same config.")
+ }
+ }
+ // To keep the original key from table properties, here we filter all
case insensitive
+ // duplicate keys out from extra options.
+ val caseInsensitiveDuplicateKeys =
Review comment:
nit: `lowerCasedDuplicatedKeys`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
+ "extra options will be merged. Otherwise, an exception will be
thrown.")
Review comment:
`will be merged with the table serde properties as the scan options`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
Review comment:
table serde properties.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
Review comment:
`extraOptionsMap` -> `extraOptions`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`create datasource` -> `resolve data source`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`table.identifier.table` -> `table.identifier`, so that we include
database name as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
table serde property
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
`input options` -> `extra options specified for this scan operation.`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
Review comment:
`input options` -> `extra options`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
+ s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to
`false`, or address the " +
+ s"conflicts of the same config.")
+ }
+ }
+ // To keep the original key from table properties, here we filter all
case insensitive
+ // duplicate keys out from extra options.
+ val caseInsensitiveDuplicateKeys =
Review comment:
nit: `lowerCasedDuplicatedKeys`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
+ "extra options will be merged. Otherwise, an exception will be
thrown.")
Review comment:
`will be merged with the table serde properties as the scan options`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
Review comment:
table serde properties.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
Review comment:
`extraOptionsMap` -> `extraOptions`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`create datasource` -> `resolve data source`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`table.identifier.table` -> `table.identifier`, so that we include
database name as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
table serde property
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
`input options` -> `extra options specified for this scan operation.`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
Review comment:
`input options` -> `extra options`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
+ s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to
`false`, or address the " +
+ s"conflicts of the same config.")
+ }
+ }
+ // To keep the original key from table properties, here we filter all
case insensitive
+ // duplicate keys out from extra options.
+ val caseInsensitiveDuplicateKeys =
Review comment:
nit: `lowerCasedDuplicatedKeys`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
+ "extra options will be merged. Otherwise, an exception will be
thrown.")
Review comment:
`will be merged with the table serde properties as the scan options`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
Review comment:
table serde properties.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
Review comment:
`extraOptionsMap` -> `extraOptions`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`create datasource` -> `resolve data source`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`table.identifier.table` -> `table.identifier`, so that we include
database name as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
table serde property
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
`input options` -> `extra options specified for this scan operation.`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
Review comment:
`input options` -> `extra options`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
+ s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to
`false`, or address the " +
+ s"conflicts of the same config.")
+ }
+ }
+ // To keep the original key from table properties, here we filter all
case insensitive
+ // duplicate keys out from extra options.
+ val caseInsensitiveDuplicateKeys =
Review comment:
nit: `lowerCasedDuplicatedKeys`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
+ "extra options will be merged. Otherwise, an exception will be
thrown.")
Review comment:
`will be merged with the table serde properties as the scan options`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
Review comment:
table serde properties.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
Review comment:
`extraOptionsMap` -> `extraOptions`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`create datasource` -> `resolve data source`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`table.identifier.table` -> `table.identifier`, so that we include
database name as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
table serde property
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
`input options` -> `extra options specified for this scan operation.`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
Review comment:
`input options` -> `extra options`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
+ s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to
`false`, or address the " +
+ s"conflicts of the same config.")
+ }
+ }
+ // To keep the original key from table properties, here we filter all
case insensitive
+ // duplicate keys out from extra options.
+ val caseInsensitiveDuplicateKeys =
Review comment:
nit: `lowerCasedDuplicatedKeys`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
+ "extra options will be merged. Otherwise, an exception will be
thrown.")
Review comment:
`will be merged with the table serde properties as the scan options`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2732,6 +2732,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_EXTRA_OPTIONS_BEHAVIOR =
+ buildConf("spark.sql.legacy.extraOptionsBehavior.enabled")
+ .internal()
+ .doc("When true, the extra options will be ignored for
DataFrameReader.table(). If set it " +
+ "to false, which is the default, Spark will check if the extra options
have the same " +
+ "key, but the value is different with the table properties. If the
check passes, the " +
Review comment:
table serde properties.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
Review comment:
`extraOptionsMap` -> `extraOptions`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`create datasource` -> `resolve data source`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
Review comment:
`table.identifier.table` -> `table.identifier`, so that we include
database name as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
table serde property
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
Review comment:
`input options` -> `extra options specified for this scan operation.`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
Review comment:
`input options` -> `extra options`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -190,4 +194,33 @@ object DataSourceUtils {
case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
}
+
+ def generateDatasourceOptions(
+ extraOptionsMap: CaseInsensitiveStringMap, table: CatalogTable):
Map[String, String] = {
+ val pathOption = table.storage.locationUri.map("path" ->
CatalogUtils.URIToString(_))
+ val options = table.storage.properties ++ pathOption
+ if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) {
+ // Check the same key with different values
+ table.storage.properties.foreach { case (k, v) =>
+ if (extraOptionsMap.containsKey(k) && extraOptionsMap.get(k) != v) {
+ throw new AnalysisException(
+ s"Fail to create datasource for the table
${table.identifier.table} since the table " +
+ s"property has the duplicated key $k with input options. To fix
this, you can " +
+ "rollback to the legacy behavior of ignoring the input options
by setting the " +
+ s"config ${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to
`false`, or address the " +
+ s"conflicts of the same config.")
+ }
+ }
+ // To keep the original key from table properties, here we filter all
case insensitive
+ // duplicate keys out from extra options.
+ val caseInsensitiveDuplicateKeys =
Review comment:
nit: `lowerCasedDuplicatedKeys`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]