This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 630aa0bf87a [SPARK-40215][SQL] Add SQL configs to control CSV/JSON date and timestamp parsing behavior 630aa0bf87a is described below commit 630aa0bf87aee1ccea181532328e3ed25e73d63e Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Fri Aug 26 10:23:04 2022 +0300 [SPARK-40215][SQL] Add SQL configs to control CSV/JSON date and timestamp parsing behavior ### What changes were proposed in this pull request? This is a follow-up for [SPARK-39731](https://issues.apache.org/jira/browse/SPARK-39731) and PR https://github.com/apache/spark/pull/37147. I found that it could be problematic to change `spark.sql.legacy.timeParserPolicy` to LEGACY when inferring dates and timestamps in CSV and JSON. Sometimes it is beneficial to have the time parser policy as CORRECTED but still use a more lenient date and timestamp inference (or when migrating to a newer Spark version). I added two separate configs that control this behavior: - `spark.sql.legacy.csv.enableDateTimeParsingFallback` - `spark.sql.legacy.json.enableDateTimeParsingFallback` When the configs are set to `true`, the legacy time parsing behaviour is enabled (pre Spark 3.0). With this PR, the precedence order is as follows for CSV (similar for JSON): - data source option `enableDateTimeParsingFallback` - if that is not set, check `spark.sql.legacy.{csv,json}.enableDateTimeParsingFallback` - if that is not set, check `spark.sql.legacy.timeParserPolicy` and whether or not a custom format is used. ### Why are the changes needed? The change makes it easier for users to migrate to a newer Spark version without changing global config `spark.sql.legacy.timeParserPolicy`. Also, allows to enable legacy parsing for CSV and JSON separately without changing the code or the global time parser config. ### Does this PR introduce _any_ user-facing change? No, simply adds an ability to change the behaviour specifically for CSV or JSON. ### How was this patch tested? I added a unit test for CSV and JSON to verify the flag. Closes #37653 from sadikovi/SPARK-40215. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 20 ++++++++------ .../spark/sql/catalyst/json/JacksonParser.scala | 20 ++++++++------ .../org/apache/spark/sql/internal/SQLConf.scala | 22 +++++++++++++++ .../sql/execution/datasources/csv/CSVSuite.scala | 32 ++++++++++++++++++++++ .../sql/execution/datasources/json/JsonSuite.scala | 32 ++++++++++++++++++++++ 5 files changed, 110 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index c9955d72524..9d855d1a93d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -124,15 +124,19 @@ class UnivocityParser( // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 06133d44c13..f8adac1ee44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -82,15 +82,19 @@ class JacksonParser( // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions. private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 31bdbca4a25..eb7a6a9105e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3520,6 +3520,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback") + .internal() + .doc("When true, enable legacy date/time parsing fallback in CSV") + .version("3.4.0") + .booleanConf + .createOptional + + val LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.legacy.json.enableDateTimeParsingFallback") + .internal() + .doc("When true, enable legacy date/time parsing fallback in JSON") + .version("3.4.0") + .booleanConf + .createOptional + val ADD_PARTITION_BATCH_SIZE = buildConf("spark.sql.addPartitionInBatch.size") .internal() @@ -4621,6 +4637,12 @@ class SQLConf extends Serializable with Logging { def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) + def jsonEnableDateTimeParsingFallback: Option[Boolean] = + getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK) + + def csvEnableDateTimeParsingFallback: Option[Boolean] = + getConf(LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK) + def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0068f57a769..5c97821f11e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2949,6 +2949,38 @@ abstract class CSVSuite ) } } + + test("SPARK-40215: enable parsing fallback for CSV in CORRECTED mode with a SQL config") { + withTempPath { path => + Seq("2020-01-01,2020-01-01").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (fallbackEnabled <- Seq(true, false)) { + withSQLConf( + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", + SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + val df = spark.read + .schema("date date, ts timestamp") + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .csv(path.getAbsolutePath) + + if (fallbackEnabled) { + checkAnswer( + df, + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null)) + ) + } + } + } + } + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 02225d40c83..f0801ae313e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3322,6 +3322,38 @@ abstract class JsonSuite ) } } + + test("SPARK-40215: enable parsing fallback for JSON in CORRECTED mode with a SQL config") { + withTempPath { path => + Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (fallbackEnabled <- Seq(true, false)) { + withSQLConf( + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", + SQLConf.LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + val df = spark.read + .schema("date date, ts timestamp") + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .json(path.getAbsolutePath) + + if (fallbackEnabled) { + checkAnswer( + df, + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null)) + ) + } + } + } + } + } } class JsonV1Suite extends JsonSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org