This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bfafad4d47b4 [SPARK-46440][SQL] Set the rebase configs to the 
`CORRECTED` mode by default
bfafad4d47b4 is described below

commit bfafad4d47b4f60e93d17ccc3a8dcc8bae03cf9a
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Mon Dec 18 11:41:29 2023 +0300

    [SPARK-46440][SQL] Set the rebase configs to the `CORRECTED` mode by default
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to set all rebase related SQL configs to the 
`CORRECTED` mode by default. Here are the affected configs:
    - spark.sql.parquet.int96RebaseModeInWrite
    - spark.sql.parquet.datetimeRebaseModeInWrite
    - spark.sql.parquet.int96RebaseModeInRead
    - spark.sql.parquet.datetimeRebaseModeInRead
    - spark.sql.avro.datetimeRebaseModeInWrite
    - spark.sql.avro.datetimeRebaseModeInRead
    
    ### Why are the changes needed?
    The configs were set to the `EXCEPTION` mode to give users a choice to 
select proper mode for compatibility with old Spark versions <= 2.4.5. Those 
versions are not able to detect the rebase mode from meta information in 
parquet and avro files. Since the versions are out of broad usage, Spark 
starting from the version 4.0.0 will write/ read ancient datatime without 
rebasing and any exceptions. This should be more convenient for users.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it can if user's code expects an exception while reading/writing 
ancient datetimes.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
    $ build/sbt "test:testOnly *ParquetRebaseDatetimeV2Suite"
    $ build/sbt "test:testOnly *RebaseDateTimeSuite"
    $ build/sbt "test:testOnly *AvroV1Suite"
    $ build/sbt "test:testOnly *AvroV2Suite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44388 from MaxGekk/set-rebase-conf-corrected.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 16 ++++++++++------
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 12 ++++++------
 .../datasources/parquet/ParquetRebaseDatetimeSuite.scala |  6 +++---
 3 files changed, 19 insertions(+), 15 deletions(-)

diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index e904ecdb3886..8a5634235639 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2347,13 +2347,17 @@ abstract class AvroSuite
               |  ]
               |}""".stripMargin
 
-          // By default we should fail to write ancient datetime values.
-          val e = intercept[SparkException] {
-            df.write.format("avro").option("avroSchema", 
avroSchema).save(path3_x)
+          withSQLConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE.key -> 
EXCEPTION.toString) {
+            val e = intercept[SparkException] {
+              df.write.format("avro").option("avroSchema", 
avroSchema).save(path3_x)
+            }
+            assert(e.getCause.getCause.isInstanceOf[SparkUpgradeException])
           }
-          assert(e.getCause.getCause.isInstanceOf[SparkUpgradeException])
           checkDefaultLegacyRead(oldPath)
 
+          // By default we should not fail to write ancient datetime values.
+          df.write.format("avro").option("avroSchema", 
avroSchema).mode("overwrite").save(path3_x)
+
           withSQLConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString) {
             df.write.format("avro").option("avroSchema", 
avroSchema).mode("overwrite").save(path3_x)
           }
@@ -2378,9 +2382,9 @@ abstract class AvroSuite
     }
     def successInRead(path: String): Unit = 
spark.read.format("avro").load(path).collect()
     Seq(
-      // By default we should fail to read ancient datetime values when 
parquet files don't
+      // By default we should not fail to read ancient datetime values when 
parquet files don't
       // contain Spark version.
-      "2_4_5" -> failInRead _,
+      "2_4_5" -> successInRead _,
       "2_4_6" -> successInRead _,
       "3_2_0" -> successInRead _
     ).foreach { case (version, checkDefaultRead) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9918d583d49e..23217f94d64e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4069,7 +4069,7 @@ object SQLConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
-      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+      .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString)
 
   val PARQUET_REBASE_MODE_IN_WRITE =
     buildConf("spark.sql.parquet.datetimeRebaseModeInWrite")
@@ -4087,7 +4087,7 @@ object SQLConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
-      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+      .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString)
 
   val PARQUET_INT96_REBASE_MODE_IN_READ =
     buildConf("spark.sql.parquet.int96RebaseModeInRead")
@@ -4103,7 +4103,7 @@ object SQLConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
-      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+      .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString)
 
   val PARQUET_REBASE_MODE_IN_READ =
     buildConf("spark.sql.parquet.datetimeRebaseModeInRead")
@@ -4122,7 +4122,7 @@ object SQLConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
-      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+      .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString)
 
   val AVRO_REBASE_MODE_IN_WRITE =
     buildConf("spark.sql.avro.datetimeRebaseModeInWrite")
@@ -4137,7 +4137,7 @@ object SQLConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
-      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+      .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString)
 
   val AVRO_REBASE_MODE_IN_READ =
     buildConf("spark.sql.avro.datetimeRebaseModeInRead")
@@ -4153,7 +4153,7 @@ object SQLConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
-      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+      .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString)
 
   val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
     buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
index 4f9064113454..35775ebbcbab 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
@@ -173,9 +173,9 @@ abstract class ParquetRebaseDatetimeSuite
       assert(e.getCause.isInstanceOf[SparkUpgradeException])
     }
     Seq(
-      // By default we should fail to read ancient datetime values when 
parquet files don't
+      // By default we should not fail to read ancient datetime values when 
parquet files don't
       // contain Spark version.
-      "2_4_5" -> failInRead _,
+      "2_4_5" -> successInRead _,
       "2_4_6" -> successInRead _,
       "3_2_0" -> successInRead _).foreach { case (version, checkDefaultRead) =>
       withAllParquetReaders {
@@ -201,7 +201,7 @@ abstract class ParquetRebaseDatetimeSuite
       }
     }
     Seq(
-      "2_4_5" -> failInRead _,
+      "2_4_5" -> successInRead _,
       "2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) =>
       withAllParquetReaders {
         Seq("plain", "dict").foreach { enc =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to