HyukjinKwon commented on pull request #28526:
URL: https://github.com/apache/spark/pull/28526#issuecomment-628455770


   Many .. 
   
   ```diff
   diff --cc 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
   index 27206edb287,1d18594fd34..00000000000
   --- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
   +++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
   @@@ -110,22 -116,13 +116,29 @@@ class AvroDeserializer
           case (LONG, TimestampType) => avroType.getLogicalType match {
             // For backward compatibility, if the Avro type is Long and it is 
not logical type
             // (the `null` case), the value is processed as timestamp type 
with millisecond precision.
   ++<<<<<<< HEAD
    +        case null | _: TimestampMillis if rebaseDateTime => (updater, 
ordinal, value) =>
    +          val millis = value.asInstanceOf[Long]
    +          val micros = DateTimeUtils.fromMillis(millis)
    +          val rebasedMicros = rebaseJulianToGregorianMicros(micros)
    +          updater.setLong(ordinal, rebasedMicros)
    +        case null | _: TimestampMillis => (updater, ordinal, value) =>
    +          val millis = value.asInstanceOf[Long]
    +          val micros = DateTimeUtils.fromMillis(millis)
    +          updater.setLong(ordinal, micros)
    +        case _: TimestampMicros if rebaseDateTime => (updater, ordinal, 
value) =>
    +          val micros = value.asInstanceOf[Long]
    +          val rebasedMicros = rebaseJulianToGregorianMicros(micros)
    +          updater.setLong(ordinal, rebasedMicros)
   ++=======
   +         case null | _: TimestampMillis => (updater, ordinal, value) =>
   +           val millis = value.asInstanceOf[Long]
   +           val micros = DateTimeUtils.millisToMicros(millis)
   +           updater.setLong(ordinal, timestampRebaseFunc(micros))
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
             case _: TimestampMicros => (updater, ordinal, value) =>
               val micros = value.asInstanceOf[Long]
   -           updater.setLong(ordinal, micros)
   +           updater.setLong(ordinal, timestampRebaseFunc(micros))
             case other => throw new IncompatibleSchemaException(
               s"Cannot convert Avro logical type ${other} to Catalyst 
Timestamp type.")
           }
   diff --cc 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
   index dc232168fd2,21c5dec6239..00000000000
   --- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
   +++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
   @@@ -155,15 -160,10 +160,22 @@@ class AvroSerializer
           case (TimestampType, LONG) => avroType.getLogicalType match {
               // For backward compatibility, if the Avro type is Long and it 
is not logical type
               // (the `null` case), output the timestamp value as with 
millisecond precision.
   ++<<<<<<< HEAD
    +          case null | _: TimestampMillis if rebaseDateTime => (getter, 
ordinal) =>
    +            val micros = getter.getLong(ordinal)
    +            val rebasedMicros = rebaseGregorianToJulianMicros(micros)
    +            DateTimeUtils.toMillis(rebasedMicros)
    +          case null | _: TimestampMillis => (getter, ordinal) =>
    +            DateTimeUtils.toMillis(getter.getLong(ordinal))
    +          case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
    +            rebaseGregorianToJulianMicros(getter.getLong(ordinal))
    +          case _: TimestampMicros => (getter, ordinal) => 
getter.getLong(ordinal)
   ++=======
   +           case null | _: TimestampMillis => (getter, ordinal) =>
   +             
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
   +           case _: TimestampMicros => (getter, ordinal) =>
   +             timestampRebaseFunc(getter.getLong(ordinal))
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
               case other => throw new IncompatibleSchemaException(
                 s"Cannot convert Catalyst Timestamp type to Avro logical type 
${other}")
             }
   diff --cc 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
   index 6c18280ce4d,aeaf884c7d1..00000000000
   --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
   +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
   @@@ -2509,57 -2520,71 +2509,83 @@@ object SQLConf
         .booleanConf
         .createWithDefault(false)
   
   ++<<<<<<< HEAD
    +  val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
    +    buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
    +      .internal()
    +      .doc("When true, rebase dates/timestamps from Proleptic Gregorian 
calendar " +
    +        "to the hybrid calendar (Julian + Gregorian) in write. " +
    +        "The rebasing is performed by converting micros/millis/days to " +
    +        "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
    +        "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
    +        "since the epoch 1970-01-01 00:00:00Z.")
   -       .version("3.0.0")
   -       .booleanConf
   -       .createWithDefault(false)
   -
   -   val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
   -     buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
   ++=======
   +    val LEGACY_INTEGER_GROUPING_ID =
   +     buildConf("spark.sql.legacy.integerGroupingId")
           .internal()
   -       .doc("When true, rebase dates/timestamps " +
   -         "from the hybrid calendar to Proleptic Gregorian calendar in read. 
" +
   -         "The rebasing is performed by converting micros/millis/days to " +
   -         "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
   -         "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
   -         "since the epoch 1970-01-01 00:00:00Z.")
   -       .version("3.0.0")
   +       .doc("When true, grouping_id() returns int values instead of long 
values.")
   +       .version("3.1.0")
           .booleanConf
           .createWithDefault(false)
   
   -   val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
   -     buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
   +   val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
   +     buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
           .internal()
   -       .doc("When true, rebase dates/timestamps from Proleptic Gregorian 
calendar " +
   -         "to the hybrid calendar (Julian + Gregorian) in write. " +
   -         "The rebasing is performed by converting micros/millis/days to " +
   -         "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
   -         "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
   -         "since the epoch 1970-01-01 00:00:00Z.")
   +       .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic 
Gregorian calendar " +
   +         "to the legacy hybrid (Julian + Gregorian) calendar when writing 
Parquet files. " +
   +         "When CORRECTED, Spark will not do rebase and write the 
dates/timestamps as it is. " +
   +         "When EXCEPTION, which is the default, Spark will fail the writing 
if it sees " +
   +         "ancient dates/timestamps that are ambiguous between the two 
calendars.")
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
           .version("3.0.0")
   -       .booleanConf
   -       .createWithDefault(false)
   +       .stringConf
   +       .transform(_.toUpperCase(Locale.ROOT))
   +       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
   +       .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
   +
   +   val LEGACY_PARQUET_REBASE_MODE_IN_READ =
   +     buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
   +       .internal()
   +       .doc("When LEGACY, Spark will rebase dates/timestamps from the 
legacy hybrid (Julian + " +
   +         "Gregorian) calendar to Proleptic Gregorian calendar when reading 
Parquet files. " +
   +         "When CORRECTED, Spark will not do rebase and read the 
dates/timestamps as it is. " +
   +         "When EXCEPTION, which is the default, Spark will fail the reading 
if it sees " +
   +         "ancient dates/timestamps that are ambiguous between the two 
calendars. This config is " +
   +         "only effective if the writer info (like Spark, Hive) of the 
Parquet files is unknown.")
   +       .version("3.0.0")
   +       .stringConf
   +       .transform(_.toUpperCase(Locale.ROOT))
   +       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
   +       .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
   
   -   val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
   -     buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
   +   val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
   +     buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
           .internal()
   -       .doc("When true, rebase dates/timestamps " +
   -         "from the hybrid calendar to Proleptic Gregorian calendar in read. 
" +
   -         "The rebasing is performed by converting micros/millis/days to " +
   -         "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
   -         "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
   -         "since the epoch 1970-01-01 00:00:00Z.")
   +       .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic 
Gregorian calendar " +
   +         "to the legacy hybrid (Julian + Gregorian) calendar when writing 
Avro files. " +
   +         "When CORRECTED, Spark will not do rebase and write the 
dates/timestamps as it is. " +
   +         "When EXCEPTION, which is the default, Spark will fail the writing 
if it sees " +
   +         "ancient dates/timestamps that are ambiguous between the two 
calendars.")
           .version("3.0.0")
   -       .booleanConf
   -       .createWithDefault(false)
   +       .stringConf
   +       .transform(_.toUpperCase(Locale.ROOT))
   +       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
   +       .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
   +
   +   val LEGACY_AVRO_REBASE_MODE_IN_READ =
   +     buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
   +       .internal()
   +       .doc("When LEGACY, Spark will rebase dates/timestamps from the 
legacy hybrid (Julian + " +
   +         "Gregorian) calendar to Proleptic Gregorian calendar when reading 
Avro files. " +
   +         "When CORRECTED, Spark will not do rebase and read the 
dates/timestamps as it is. " +
   +         "When EXCEPTION, which is the default, Spark will fail the reading 
if it sees " +
   +         "ancient dates/timestamps that are ambiguous between the two 
calendars. This config is " +
   +         "only effective if the writer info (like Spark, Hive) of the Avro 
files is unknown.")
   +       .version("3.0.0")
   +       .stringConf
   +       .transform(_.toUpperCase(Locale.ROOT))
   +       .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
   +       .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
   
       /**
        * Holds information about keys that have been deprecated.
   @@@ -3139,9 -3166,7 +3165,13 @@@ class SQLConf extends Serializable wit
   
       def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
   
   ++<<<<<<< HEAD
    +  def parquetRebaseDateTimeInReadEnabled: Boolean = {
    +    getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
    +  }
   ++=======
   +   def integerGroupingIdEnabled: Boolean = 
getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
   
       /** ********************** SQLConf functionality methods ************ */
   
   diff --cc 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
   index 11ce11dd721,3e409ab9a50..00000000000
   --- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
   +++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
   @@@ -324,20 -352,20 +352,32 @@@ public class VectorizedColumnReader
                 }
               }
             } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
   -           if (rebaseDateTime) {
   +           if ("CORRECTED".equals(datetimeRebaseMode)) {
                 for (int i = rowId; i < rowId + num; ++i) {
                   if (!column.isNullAt(i)) {
   ++<<<<<<< HEAD
    +                long julianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
    +                long julianMicros = DateTimeUtils.fromMillis(julianMillis);
    +                long gregorianMicros = 
RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
    +                column.putLong(i, gregorianMicros);
   ++=======
   +                 long gregorianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
   +                 column.putLong(i, 
DateTimeUtils.millisToMicros(gregorianMillis));
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
                   }
                 }
               } else {
   +             final boolean failIfRebase = 
"EXCEPTION".equals(datetimeRebaseMode);
                 for (int i = rowId; i < rowId + num; ++i) {
                   if (!column.isNullAt(i)) {
   ++<<<<<<< HEAD
    +                long gregorianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
    +                column.putLong(i, 
DateTimeUtils.fromMillis(gregorianMillis));
   ++=======
   +                 long julianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
   +                 long julianMicros = 
DateTimeUtils.millisToMicros(julianMillis);
   +                 column.putLong(i, rebaseMicros(julianMicros, 
failIfRebase));
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
                   }
                 }
               }
   @@@ -485,27 -514,29 +526,38 @@@
           defColumn.readLongs(
             num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
         } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
   -       if (rebaseDateTime) {
   -         defColumn.readLongsWithRebase(
   -           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
   -       } else {
   +       if ("CORRECTED".equals(datetimeRebaseMode)) {
             defColumn.readLongs(
               num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
   +       } else {
   +         boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
   +         defColumn.readLongsWithRebase(
   +           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn, failIfRebase);
           }
         } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
   -       if (rebaseDateTime) {
   +       if ("CORRECTED".equals(datetimeRebaseMode)) {
             for (int i = 0; i < num; i++) {
               if (defColumn.readInteger() == maxDefLevel) {
   ++<<<<<<< HEAD
    +            long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
    +            column.putLong(rowId + i, 
RebaseDateTime.rebaseJulianToGregorianMicros(micros));
   ++=======
   +             column.putLong(rowId + i, 
DateTimeUtils.millisToMicros(dataColumn.readLong()));
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
               } else {
                 column.putNull(rowId + i);
               }
             }
           } else {
   +         final boolean failIfRebase = 
"EXCEPTION".equals(datetimeRebaseMode);
             for (int i = 0; i < num; i++) {
               if (defColumn.readInteger() == maxDefLevel) {
   ++<<<<<<< HEAD
    +            column.putLong(rowId + i, 
DateTimeUtils.fromMillis(dataColumn.readLong()));
   ++=======
   +             long julianMicros = 
DateTimeUtils.millisToMicros(dataColumn.readLong());
   +             column.putLong(rowId + i, rebaseMicros(julianMicros, 
failIfRebase));
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
               } else {
                 column.putNull(rowId + i);
               }
   diff --cc 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
   index 08fbca2995c,201ee16faeb..00000000000
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
   @@@ -291,19 -289,10 +289,26 @@@ private[parquet] class ParquetRowConver
             }
   
           case TimestampType if parquetType.getOriginalType == 
OriginalType.TIMESTAMP_MILLIS =>
   ++<<<<<<< HEAD
    +        if (rebaseDateTime) {
    +          new ParquetPrimitiveConverter(updater) {
    +            override def addLong(value: Long): Unit = {
    +              val micros = DateTimeUtils.fromMillis(value)
    +              val rebased = rebaseJulianToGregorianMicros(micros)
    +              updater.setLong(rebased)
    +            }
    +          }
    +        } else {
    +          new ParquetPrimitiveConverter(updater) {
    +            override def addLong(value: Long): Unit = {
    +              updater.setLong(DateTimeUtils.fromMillis(value))
    +            }
   ++=======
   +         new ParquetPrimitiveConverter(updater) {
   +           override def addLong(value: Long): Unit = {
   +             val micros = DateTimeUtils.millisToMicros(value)
   +             updater.setLong(timestampRebaseFunc(micros))
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
               }
             }
   
   diff --cc 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
   index e367b9cc774,6c333671d59..00000000000
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
   @@@ -187,24 -198,15 +198,29 @@@ class ParquetWriteSupport extends Write
                   
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
                   
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
   
   -           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if 
rebaseDateTime =>
   -             (row: SpecializedGetters, ordinal: Int) =>
   -               val rebasedMicros = 
rebaseGregorianToJulianMicros(row.getLong(ordinal))
   -               recordConsumer.addLong(rebasedMicros)
   -
               case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
                 (row: SpecializedGetters, ordinal: Int) =>
   ++<<<<<<< HEAD
    +              recordConsumer.addLong(row.getLong(ordinal))
    +
    +          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if 
rebaseDateTime =>
    +            (row: SpecializedGetters, ordinal: Int) =>
    +              val rebasedMicros = 
rebaseGregorianToJulianMicros(row.getLong(ordinal))
    +              val millis = DateTimeUtils.toMillis(rebasedMicros)
    +              recordConsumer.addLong(millis)
    +
    +          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
    +            (row: SpecializedGetters, ordinal: Int) =>
    +              val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
   ++=======
   +               val micros = row.getLong(ordinal)
   +               recordConsumer.addLong(timestampRebaseFunc(micros))
   +
   +           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
   +             (row: SpecializedGetters, ordinal: Int) =>
   +               val micros = row.getLong(ordinal)
   +               val millis = 
DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
   ++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when 
reading/writing legacy datetime values from/to Parquet/Avro files
                   recordConsumer.addLong(millis)
             }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to