HyukjinKwon commented on pull request #28526:
URL: https://github.com/apache/spark/pull/28526#issuecomment-628455770
Many ..
```diff
diff --cc
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 27206edb287,1d18594fd34..00000000000
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@@ -110,22 -116,13 +116,29 @@@ class AvroDeserializer
case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is
not logical type
// (the `null` case), the value is processed as timestamp type
with millisecond precision.
++<<<<<<< HEAD
+ case null | _: TimestampMillis if rebaseDateTime => (updater,
ordinal, value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.fromMillis(millis)
+ val rebasedMicros = rebaseJulianToGregorianMicros(micros)
+ updater.setLong(ordinal, rebasedMicros)
+ case null | _: TimestampMillis => (updater, ordinal, value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.fromMillis(millis)
+ updater.setLong(ordinal, micros)
+ case _: TimestampMicros if rebaseDateTime => (updater, ordinal,
value) =>
+ val micros = value.asInstanceOf[Long]
+ val rebasedMicros = rebaseJulianToGregorianMicros(micros)
+ updater.setLong(ordinal, rebasedMicros)
++=======
+ case null | _: TimestampMillis => (updater, ordinal, value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.millisToMicros(millis)
+ updater.setLong(ordinal, timestampRebaseFunc(micros))
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
- updater.setLong(ordinal, micros)
+ updater.setLong(ordinal, timestampRebaseFunc(micros))
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst
Timestamp type.")
}
diff --cc
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index dc232168fd2,21c5dec6239..00000000000
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@@ -155,15 -160,10 +160,22 @@@ class AvroSerializer
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it
is not logical type
// (the `null` case), output the timestamp value as with
millisecond precision.
++<<<<<<< HEAD
+ case null | _: TimestampMillis if rebaseDateTime => (getter,
ordinal) =>
+ val micros = getter.getLong(ordinal)
+ val rebasedMicros = rebaseGregorianToJulianMicros(micros)
+ DateTimeUtils.toMillis(rebasedMicros)
+ case null | _: TimestampMillis => (getter, ordinal) =>
+ DateTimeUtils.toMillis(getter.getLong(ordinal))
+ case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
+ rebaseGregorianToJulianMicros(getter.getLong(ordinal))
+ case _: TimestampMicros => (getter, ordinal) =>
getter.getLong(ordinal)
++=======
+ case null | _: TimestampMillis => (getter, ordinal) =>
+
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
+ case _: TimestampMicros => (getter, ordinal) =>
+ timestampRebaseFunc(getter.getLong(ordinal))
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type
${other}")
}
diff --cc
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6c18280ce4d,aeaf884c7d1..00000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@@ -2509,57 -2520,71 +2509,83 @@@ object SQLConf
.booleanConf
.createWithDefault(false)
++<<<<<<< HEAD
+ val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
+ buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
+ .internal()
+ .doc("When true, rebase dates/timestamps from Proleptic Gregorian
calendar " +
+ "to the hybrid calendar (Julian + Gregorian) in write. " +
+ "The rebasing is performed by converting micros/millis/days to " +
+ "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
+ "timestamp in the target calendar, and getting the number of
micros/millis/days " +
+ "since the epoch 1970-01-01 00:00:00Z.")
- .version("3.0.0")
- .booleanConf
- .createWithDefault(false)
-
- val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
- buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
++=======
+ val LEGACY_INTEGER_GROUPING_ID =
+ buildConf("spark.sql.legacy.integerGroupingId")
.internal()
- .doc("When true, rebase dates/timestamps " +
- "from the hybrid calendar to Proleptic Gregorian calendar in read.
" +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
- .version("3.0.0")
+ .doc("When true, grouping_id() returns int values instead of long
values.")
+ .version("3.1.0")
.booleanConf
.createWithDefault(false)
- val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
- buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
+ val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
+ buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
.internal()
- .doc("When true, rebase dates/timestamps from Proleptic Gregorian
calendar " +
- "to the hybrid calendar (Julian + Gregorian) in write. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
+ "to the legacy hybrid (Julian + Gregorian) calendar when writing
Parquet files. " +
+ "When CORRECTED, Spark will not do rebase and write the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the writing
if it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars.")
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
.version("3.0.0")
- .booleanConf
- .createWithDefault(false)
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+ val LEGACY_PARQUET_REBASE_MODE_IN_READ =
+ buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from the
legacy hybrid (Julian + " +
+ "Gregorian) calendar to Proleptic Gregorian calendar when reading
Parquet files. " +
+ "When CORRECTED, Spark will not do rebase and read the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the reading
if it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars. This config is " +
+ "only effective if the writer info (like Spark, Hive) of the
Parquet files is unknown.")
+ .version("3.0.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
- val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
- buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
+ val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
+ buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
.internal()
- .doc("When true, rebase dates/timestamps " +
- "from the hybrid calendar to Proleptic Gregorian calendar in read.
" +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
+ "to the legacy hybrid (Julian + Gregorian) calendar when writing
Avro files. " +
+ "When CORRECTED, Spark will not do rebase and write the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the writing
if it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars.")
.version("3.0.0")
- .booleanConf
- .createWithDefault(false)
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+ val LEGACY_AVRO_REBASE_MODE_IN_READ =
+ buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from the
legacy hybrid (Julian + " +
+ "Gregorian) calendar to Proleptic Gregorian calendar when reading
Avro files. " +
+ "When CORRECTED, Spark will not do rebase and read the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the reading
if it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars. This config is " +
+ "only effective if the writer info (like Spark, Hive) of the Avro
files is unknown.")
+ .version("3.0.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
/**
* Holds information about keys that have been deprecated.
@@@ -3139,9 -3166,7 +3165,13 @@@ class SQLConf extends Serializable wit
def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
++<<<<<<< HEAD
+ def parquetRebaseDateTimeInReadEnabled: Boolean = {
+ getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
+ }
++=======
+ def integerGroupingIdEnabled: Boolean =
getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
/** ********************** SQLConf functionality methods ************ */
diff --cc
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 11ce11dd721,3e409ab9a50..00000000000
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@@ -324,20 -352,20 +352,32 @@@ public class VectorizedColumnReader
}
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
- if (rebaseDateTime) {
+ if ("CORRECTED".equals(datetimeRebaseMode)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
++<<<<<<< HEAD
+ long julianMillis =
dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ long julianMicros = DateTimeUtils.fromMillis(julianMillis);
+ long gregorianMicros =
RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+ column.putLong(i, gregorianMicros);
++=======
+ long gregorianMillis =
dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ column.putLong(i,
DateTimeUtils.millisToMicros(gregorianMillis));
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
}
}
} else {
+ final boolean failIfRebase =
"EXCEPTION".equals(datetimeRebaseMode);
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
++<<<<<<< HEAD
+ long gregorianMillis =
dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ column.putLong(i,
DateTimeUtils.fromMillis(gregorianMillis));
++=======
+ long julianMillis =
dictionary.decodeToLong(dictionaryIds.getDictId(i));
+ long julianMicros =
DateTimeUtils.millisToMicros(julianMillis);
+ column.putLong(i, rebaseMicros(julianMicros,
failIfRebase));
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
}
}
}
@@@ -485,27 -514,29 +526,38 @@@
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader)
dataColumn);
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
- if (rebaseDateTime) {
- defColumn.readLongsWithRebase(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader)
dataColumn);
- } else {
+ if ("CORRECTED".equals(datetimeRebaseMode)) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader)
dataColumn);
+ } else {
+ boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
+ defColumn.readLongsWithRebase(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader)
dataColumn, failIfRebase);
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
- if (rebaseDateTime) {
+ if ("CORRECTED".equals(datetimeRebaseMode)) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
++<<<<<<< HEAD
+ long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
+ column.putLong(rowId + i,
RebaseDateTime.rebaseJulianToGregorianMicros(micros));
++=======
+ column.putLong(rowId + i,
DateTimeUtils.millisToMicros(dataColumn.readLong()));
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
} else {
column.putNull(rowId + i);
}
}
} else {
+ final boolean failIfRebase =
"EXCEPTION".equals(datetimeRebaseMode);
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
++<<<<<<< HEAD
+ column.putLong(rowId + i,
DateTimeUtils.fromMillis(dataColumn.readLong()));
++=======
+ long julianMicros =
DateTimeUtils.millisToMicros(dataColumn.readLong());
+ column.putLong(rowId + i, rebaseMicros(julianMicros,
failIfRebase));
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
} else {
column.putNull(rowId + i);
}
diff --cc
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 08fbca2995c,201ee16faeb..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@@ -291,19 -289,10 +289,26 @@@ private[parquet] class ParquetRowConver
}
case TimestampType if parquetType.getOriginalType ==
OriginalType.TIMESTAMP_MILLIS =>
++<<<<<<< HEAD
+ if (rebaseDateTime) {
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ val micros = DateTimeUtils.fromMillis(value)
+ val rebased = rebaseJulianToGregorianMicros(micros)
+ updater.setLong(rebased)
+ }
+ }
+ } else {
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ updater.setLong(DateTimeUtils.fromMillis(value))
+ }
++=======
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ val micros = DateTimeUtils.millisToMicros(value)
+ updater.setLong(timestampRebaseFunc(micros))
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
}
}
diff --cc
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index e367b9cc774,6c333671d59..00000000000
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@@ -187,24 -198,15 +198,29 @@@ class ParquetWriteSupport extends Write
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
- case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if
rebaseDateTime =>
- (row: SpecializedGetters, ordinal: Int) =>
- val rebasedMicros =
rebaseGregorianToJulianMicros(row.getLong(ordinal))
- recordConsumer.addLong(rebasedMicros)
-
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) =>
++<<<<<<< HEAD
+ recordConsumer.addLong(row.getLong(ordinal))
+
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if
rebaseDateTime =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val rebasedMicros =
rebaseGregorianToJulianMicros(row.getLong(ordinal))
+ val millis = DateTimeUtils.toMillis(rebasedMicros)
+ recordConsumer.addLong(millis)
+
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
++=======
+ val micros = row.getLong(ordinal)
+ recordConsumer.addLong(timestampRebaseFunc(micros))
+
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val micros = row.getLong(ordinal)
+ val millis =
DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
++>>>>>>> fd2d55c9919... [SPARK-31405][SQL] Fail by default when
reading/writing legacy datetime values from/to Parquet/Avro files
recordConsumer.addLong(millis)
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]