gengliangwang commented on code in PR #41052:
URL: https://github.com/apache/spark/pull/41052#discussion_r1206155771
##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -639,6 +640,110 @@ abstract class AvroSuite
}
}
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of decimal type to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey =
SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA.key
+ sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
+ // With the flag disabled, we will throw an exception if there is a
mismatch
+ withSQLConf(confKey -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.schema("a DECIMAL(4,
3)").format("avro").load(path.toString).collect()
+ }
+ ExceptionUtils.getRootCause(e) match {
+ case ex: IncompatibleSchemaException =>
+ assert(ex.getMessage.contains(confKey))
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ // The following used to work, so it should still work with the flag
enabled
+ checkAnswer(
+ spark.read.schema("a DECIMAL(5,
3)").format("avro").load(path.toString),
+ Row(new java.math.BigDecimal("13.123"))
+ )
+ withSQLConf(confKey -> "true") {
+ // With the flag enabled, we return a null silently, which isn't great
+ checkAnswer(
+ spark.read.schema("a DECIMAL(4,
3)").format("avro").load(path.toString),
+ Row(null)
+ )
+ checkAnswer(
+ spark.read.schema("a DECIMAL(5,
3)").format("avro").load(path.toString),
+ Row(new java.math.BigDecimal("13.123"))
+ )
+ }
+ }
+ }
+
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of DayTimeIntervalType to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey =
SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA.key
+ val schema = StructType(Array(StructField("a", DayTimeIntervalType(),
false)))
+ val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
+
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.format("avro").save(path.getCanonicalPath)
+
+ withSQLConf(confKey -> "false") {
+ Seq("Date", "Timestamp", "Timestamp_NTZ").foreach { sqlType =>
+ val e = intercept[SparkException] {
+ spark.read.schema(s"a
$sqlType").format("avro").load(path.toString).collect()
+ }
+ ExceptionUtils.getRootCause(e) match {
+ case ex: IncompatibleSchemaException =>
+ assert(ex.getMessage.contains(confKey))
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ }
+
+ withSQLConf(confKey -> "true") {
+ val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
+ checkAnswer(
+ spark.read.schema("a Date").format("avro").load(path.toString),
+ Row(format.parse("1972-09-27"))
+ )
+ }
+ }
+ }
+
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of YearMonthIntervalType to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey =
SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA.key
+ val schema = StructType(Array(StructField("a", YearMonthIntervalType(),
false)))
+ val data = Seq(Row(java.time.Period.of(1, 1, 0)))
+
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.format("avro").save(path.getCanonicalPath)
+
+ withSQLConf(confKey -> "false") {
+ Seq("Date", "Timestamp", "Timestamp_NTZ").foreach { sqlType =>
+ val e = intercept[SparkException] {
Review Comment:
Let's use the method `checkError`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]