MaxGekk commented on a change in pull request #28477:
URL: https://github.com/apache/spark/pull/28477#discussion_r421714734
##########
File path:
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -46,17 +47,40 @@ class AvroSerializer(
rootCatalystType: DataType,
rootAvroType: Schema,
nullable: Boolean,
- rebaseDateTime: Boolean) extends Logging {
+ datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
Review comment:
How about to add a type alias to LegacyBehaviorPolicy:
```scala
object LegacyBehaviorPolicy extends Enumeration {
type LegacyBehaviorPolicy = Value
val EXCEPTION, LEGACY, CORRECTED = Value
}
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2528,57 +2528,63 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
- val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
- buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
- .internal()
- .doc("When true, rebase dates/timestamps from Proleptic Gregorian
calendar " +
- "to the hybrid calendar (Julian + Gregorian) in write. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
+ buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
+ "to the legacy hybrid (Julian + Gregorian) calendar when writing
Parquet files. " +
+ "When CORRECTED, Spark will not do rebase and write the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the writing if
it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars.")
.version("3.0.0")
- .booleanConf
- .createWithDefault(false)
-
- val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
- buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
- .internal()
- .doc("When true, rebase dates/timestamps " +
- "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+ val LEGACY_PARQUET_REBASE_MODE_IN_READ =
+ buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
Review comment:
Actually, opposite "from hybrid to Proleptic Gregorian"
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -84,17 +86,45 @@ object DataSourceUtils {
case _ => false
}
- def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+ def datetimeRebaseMode(
+ lookupFileMeta: String => String, modeByConfig: String):
LegacyBehaviorPolicy.Value = {
if (Utils.isTesting &&
SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
- return Some(false)
+ return LegacyBehaviorPolicy.CORRECTED
}
- // If there is no version, we return None and let the caller side to
decide.
+ // If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid
calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if
they were written with
- // the "rebaseInWrite" config enabled.
- version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
- }
+ // the "LEGACY" rebase mode.
+ if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+
+ def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
+ new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or
timestamps before " +
+ s"1900-01-01 from $format files can be ambiguous, as the files may be
written by Spark 2.x " +
Review comment:
1900-01-01 - it floating time point (local date). We can point out
concrete timestamp 1900-01-01 00:00:00Z
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2528,57 +2528,63 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
- val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
- buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
- .internal()
- .doc("When true, rebase dates/timestamps from Proleptic Gregorian
calendar " +
- "to the hybrid calendar (Julian + Gregorian) in write. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
+ buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
+ "to the legacy hybrid (Julian + Gregorian) calendar when writing
Parquet files. " +
+ "When CORRECTED, Spark will not do rebase and write the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the writing if
it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars.")
.version("3.0.0")
- .booleanConf
- .createWithDefault(false)
-
- val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
- buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
- .internal()
- .doc("When true, rebase dates/timestamps " +
- "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+ val LEGACY_PARQUET_REBASE_MODE_IN_READ =
+ buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
+ "to the legacy hybrid (Julian + Gregorian) calendar when reading
Parquet files. " +
+ "When CORRECTED, Spark will not do rebase and read the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the reading if
it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars. This config is " +
+ "only affective if the writer info (like Spark, Hive) of the Parquet
files is unknown.")
.version("3.0.0")
- .booleanConf
- .createWithDefault(false)
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
- val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
- buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
+ val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
+ buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
.internal()
- .doc("When true, rebase dates/timestamps from Proleptic Gregorian
calendar " +
- "to the hybrid calendar (Julian + Gregorian) in write. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
+ "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro
files. " +
+ "When CORRECTED, Spark will not do rebase and write the
dates/timestamps as it is. " +
+ "When EXCEPTION, which is the default, Spark will fail the writing if
it sees " +
+ "ancient dates/timestamps that are ambiguous between the two
calendars.")
.version("3.0.0")
- .booleanConf
- .createWithDefault(false)
-
- val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
- buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
- .internal()
- .doc("When true, rebase dates/timestamps " +
- "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
- "The rebasing is performed by converting micros/millis/days to " +
- "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
- "timestamp in the target calendar, and getting the number of
micros/millis/days " +
- "since the epoch 1970-01-01 00:00:00Z.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+ .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+ val LEGACY_AVRO_REBASE_MODE_IN_READ =
+ buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
+ .internal()
+ .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic
Gregorian calendar " +
Review comment:
Please, fix this too
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -102,14 +103,14 @@
// The timezone conversion to apply to int96 timestamps. Null if no
conversion.
private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC;
- private final boolean rebaseDateTime;
+ private final String datetimeRebaseMode;
Review comment:
Did you consider to pass enum's id? Like
`LegacyBehaviorPolicy.EXCEPTION.id`. This could be less expensive.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -132,7 +133,7 @@ public VectorizedColumnReader(
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
- this.rebaseDateTime = rebaseDateTime;
+ this.datetimeRebaseMode = datetimeRebaseMode;
Review comment:
Should we add an assert to be sure that datetimeRebaseMode is either
`CORRECTED` or `EXCEPTION` or `LEGACY` (in upper case)?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -21,12 +21,14 @@
import java.nio.ByteOrder;
import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.ParquetDecodingException;
+
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.api.Binary;
Review comment:
nit: remove the blank line
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -86,13 +88,17 @@ public final void readIntegers(int total,
WritableColumnVector c, int rowId) {
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
@Override
- public final void readIntegersWithRebase(int total, WritableColumnVector c,
int rowId) {
+ public final void readIntegersWithRebase(
+ int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
boolean rebase = false;
for (int i = 0; i < total; i += 1) {
rebase |= buffer.getInt(buffer.position() + i * 4) <
RebaseDateTime.lastSwitchJulianDay();
}
+ if (rebase && failIfRebase) {
+ throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+ }
if (rebase) {
Review comment:
nit: How about:
```java
if (rebase) {
if (failIfRebase) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i,
RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
}
}
} else {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -84,17 +86,45 @@ object DataSourceUtils {
case _ => false
}
- def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+ def datetimeRebaseMode(
Review comment:
Should it be?
```scala
def datetimeRebaseMode(
lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -84,17 +86,45 @@ object DataSourceUtils {
case _ => false
}
- def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+ def datetimeRebaseMode(
+ lookupFileMeta: String => String, modeByConfig: String):
LegacyBehaviorPolicy.Value = {
if (Utils.isTesting &&
SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
- return Some(false)
+ return LegacyBehaviorPolicy.CORRECTED
}
- // If there is no version, we return None and let the caller side to
decide.
+ // If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid
calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if
they were written with
- // the "rebaseInWrite" config enabled.
- version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
- }
+ // the "LEGACY" rebase mode.
+ if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+
+ def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
+ new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or
timestamps before " +
+ s"1900-01-01 from $format files can be ambiguous, as the files may be
written by Spark 2.x " +
+ "or legacy versions of Hive, which uses a legacy hybrid calendar that is
different from " +
+ "Spark 3.0+'s Proleptic Gregorian calendar. See more details in
SPARK-31404. You can set " +
+ s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to
rebase the datetime " +
+ "values w.r.t. the calendar switch during reading, or set " +
+ s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'CORRECTED' to
read the datetime " +
+ "values as it is.", null)
+ }
+
+ def newRebaseExceptionInWrite(format: String): SparkUpgradeException = {
+ new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or
timestamps before " +
+ s"1900-01-01 into $format files can be dangerous, as the files may be
read by Spark 2.x " +
+ "or legacy versions of Hive later, which uses a legacy hybrid calendar
that is different " +
+ "from Spark 3.0+'s Proleptic Gregorian calendar. See more details in
SPARK-31404. You can " +
+ s"set ${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to
rebase the datetime " +
Review comment:
LEGACY_PARQUET_REBASE_MODE_IN_WRITE
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -84,17 +86,45 @@ object DataSourceUtils {
case _ => false
}
- def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+ def datetimeRebaseMode(
+ lookupFileMeta: String => String, modeByConfig: String):
LegacyBehaviorPolicy.Value = {
if (Utils.isTesting &&
SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
- return Some(false)
+ return LegacyBehaviorPolicy.CORRECTED
}
- // If there is no version, we return None and let the caller side to
decide.
+ // If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid
calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if
they were written with
- // the "rebaseInWrite" config enabled.
- version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
- }
+ // the "LEGACY" rebase mode.
+ if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+
+ def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
+ new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or
timestamps before " +
+ s"1900-01-01 from $format files can be ambiguous, as the files may be
written by Spark 2.x " +
+ "or legacy versions of Hive, which uses a legacy hybrid calendar that is
different from " +
+ "Spark 3.0+'s Proleptic Gregorian calendar. See more details in
SPARK-31404. You can set " +
+ s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to
rebase the datetime " +
+ "values w.r.t. the calendar switch during reading, or set " +
+ s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'CORRECTED' to
read the datetime " +
+ "values as it is.", null)
+ }
+
+ def newRebaseExceptionInWrite(format: String): SparkUpgradeException = {
+ new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or
timestamps before " +
+ s"1900-01-01 into $format files can be dangerous, as the files may be
read by Spark 2.x " +
+ "or legacy versions of Hive later, which uses a legacy hybrid calendar
that is different " +
+ "from Spark 3.0+'s Proleptic Gregorian calendar. See more details in
SPARK-31404. You can " +
+ s"set ${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to
rebase the datetime " +
+ "values w.r.t. the calendar switch during writing, to get maximum
interoperability, or set " +
+ s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'CORRECTED' to
write the datetime " +
Review comment:
IN_WRITE too
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
##########
@@ -161,9 +161,10 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
Seq(true, false).foreach { modernDates =>
Seq(false, true).foreach { rebase =>
benchmark.addCase(caseName(modernDates, dateTime,
Some(rebase)), 1) { _ =>
+ val mode = if (rebase) "LEGACY" else "CORRECTED"
withSQLConf(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
getOutputType(dateTime),
- SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key ->
rebase.toString) {
+ SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key ->
mode.toString) {
Review comment:
mode has String type, so, no need to call toString()
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
##########
@@ -84,17 +86,45 @@ object DataSourceUtils {
case _ => false
}
- def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+ def datetimeRebaseMode(
+ lookupFileMeta: String => String, modeByConfig: String):
LegacyBehaviorPolicy.Value = {
if (Utils.isTesting &&
SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
- return Some(false)
+ return LegacyBehaviorPolicy.CORRECTED
}
- // If there is no version, we return None and let the caller side to
decide.
+ // If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid
calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if
they were written with
- // the "rebaseInWrite" config enabled.
- version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
- }
+ // the "LEGACY" rebase mode.
+ if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) {
+ LegacyBehaviorPolicy.LEGACY
+ } else {
+ LegacyBehaviorPolicy.CORRECTED
+ }
+ }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+ }
+
+ def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
+ new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or
timestamps before " +
+ s"1900-01-01 from $format files can be ambiguous, as the files may be
written by Spark 2.x " +
+ "or legacy versions of Hive, which uses a legacy hybrid calendar that is
different from " +
+ "Spark 3.0+'s Proleptic Gregorian calendar. See more details in
SPARK-31404. You can set " +
+ s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} to 'LEGACY' to
rebase the datetime " +
Review comment:
The function can be called from Avro too, right. The config can be not
relevant for Avro.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
##########
@@ -138,6 +144,28 @@ class ParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
}
}
+ private val dateRebaseFunc: Int => Int = datetimeRebaseMode match {
+ case LegacyBehaviorPolicy.EXCEPTION =>
+ days: Int =>
+ if (days < RebaseDateTime.lastSwitchGregorianDay) {
+ throw DataSourceUtils.newRebaseExceptionInWrite("Parquet")
+ }
+ days
+ case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianDays
+ case LegacyBehaviorPolicy.CORRECTED => identity[Int]
+ }
+
+ private val timestampRebaseFunc: Long => Long = datetimeRebaseMode match {
+ case LegacyBehaviorPolicy.EXCEPTION =>
+ micros: Long =>
+ if (micros < RebaseDateTime.lastSwitchGregorianTs) {
+ throw DataSourceUtils.newRebaseExceptionInWrite("Parquet")
+ }
+ micros
+ case LegacyBehaviorPolicy.LEGACY =>
RebaseDateTime.rebaseGregorianToJulianMicros
+ case LegacyBehaviorPolicy.CORRECTED => identity[Long]
+ }
Review comment:
This code is repeated again. Maybe move it to some common place somehow.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
##########
@@ -892,41 +893,67 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
val df =
Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
- df.write.parquet(path3_0)
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key ->
"true") {
+
+ // By default we should fail to write ancient datetime values.
+ var e = intercept[SparkException](df.write.parquet(path3_0))
+
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+ // By default we should fail to read ancient datetime values.
+ e = intercept[SparkException](spark.read.parquet(path2_4).collect())
+ assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key ->
CORRECTED.toString) {
+ df.write.mode("overwrite").parquet(path3_0)
+ }
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key ->
LEGACY.toString) {
df.write.parquet(path3_0_rebase)
}
- checkAnswer(
- spark.read.format("parquet").load(path2_4, path3_0,
path3_0_rebase),
- 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+
+ // For Parquet files written by Spark 3.0, we know the writer info
and don't need the
+ // config to guide the rebase behavior.
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key ->
LEGACY.toString) {
+ checkAnswer(
+ spark.read.format("parquet").load(path2_4, path3_0,
path3_0_rebase),
+ 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+ }
} else {
val df =
Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
- df.write.parquet(path3_0)
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key ->
"true") {
+ // By default we should fail to write ancient datetime values.
+ var e = intercept[SparkException](df.write.parquet(path3_0))
+
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+ // By default we should fail to read ancient datetime values.
+ e =
intercept[SparkException](spark.read.parquet(path2_4).collect())
+ assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key ->
CORRECTED.toString) {
+ df.write.mode("overwrite").parquet(path3_0)
+ }
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key ->
LEGACY.toString) {
df.write.parquet(path3_0_rebase)
}
}
- checkAnswer(
- spark.read.format("parquet").load(path2_4, path3_0,
path3_0_rebase),
- 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+ // For Parquet files written by Spark 3.0, we know the writer info
and don't need the
+ // config to guide the rebase behavior.
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key ->
LEGACY.toString) {
+ checkAnswer(
+ spark.read.format("parquet").load(path2_4, path3_0,
path3_0_rebase),
+ 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+ }
}
}
}
- Seq(false, true).foreach { vectorized =>
+ Seq(true).foreach { vectorized =>
Review comment:
Is it just for debugging?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]