zeruibao commented on code in PR #41052:
URL: https://github.com/apache/spark/pull/41052#discussion_r1207262643


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -639,6 +640,110 @@ abstract class AvroSuite
     }
   }
 
+  test("SPARK-43380: Fix Avro data type conversion" +
+      " of decimal type to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = 
SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA.key
+      sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
+      // With the flag disabled, we will throw an exception if there is a 
mismatch
+      withSQLConf(confKey -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.schema("a DECIMAL(4, 
3)").format("avro").load(path.toString).collect()
+        }
+        ExceptionUtils.getRootCause(e) match {
+          case ex: IncompatibleSchemaException =>
+            assert(ex.getMessage.contains(confKey))
+          case other =>
+            fail(s"Received unexpected exception", other)
+        }
+      }
+      // The following used to work, so it should still work with the flag 
enabled
+      checkAnswer(
+        spark.read.schema("a DECIMAL(5, 
3)").format("avro").load(path.toString),
+        Row(new java.math.BigDecimal("13.123"))
+      )
+      withSQLConf(confKey -> "true") {
+        // With the flag enabled, we return a null silently, which isn't great
+        checkAnswer(
+          spark.read.schema("a DECIMAL(4, 
3)").format("avro").load(path.toString),
+          Row(null)
+        )
+        checkAnswer(
+          spark.read.schema("a DECIMAL(5, 
3)").format("avro").load(path.toString),
+          Row(new java.math.BigDecimal("13.123"))
+        )
+      }
+    }
+  }
+
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of DayTimeIntervalType to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = 
SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA.key
+      val schema = StructType(Array(StructField("a", DayTimeIntervalType(), 
false)))
+      val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
+
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.format("avro").save(path.getCanonicalPath)
+
+      withSQLConf(confKey -> "false") {
+        Seq("Date", "Timestamp", "Timestamp_NTZ").foreach { sqlType =>
+          val e = intercept[SparkException] {
+            spark.read.schema(s"a 
$sqlType").format("avro").load(path.toString).collect()
+          }
+          ExceptionUtils.getRootCause(e) match {
+            case ex: IncompatibleSchemaException =>
+              assert(ex.getMessage.contains(confKey))
+            case other =>
+              fail(s"Received unexpected exception", other)
+          }
+        }
+      }
+
+      withSQLConf(confKey -> "true") {
+        val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
+        checkAnswer(
+          spark.read.schema("a Date").format("avro").load(path.toString),
+          Row(format.parse("1972-09-27"))
+        )
+      }
+    }
+  }
+
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of YearMonthIntervalType to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = 
SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA.key
+      val schema = StructType(Array(StructField("a", YearMonthIntervalType(), 
false)))
+      val data = Seq(Row(java.time.Period.of(1, 1, 0)))
+
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.format("avro").save(path.getCanonicalPath)
+
+      withSQLConf(confKey -> "false") {
+        Seq("Date", "Timestamp", "Timestamp_NTZ").foreach { sqlType =>
+          val e = intercept[SparkException] {

Review Comment:
   sg!



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -117,6 +118,24 @@ private[sql] class AvroDeserializer(
     val incompatibleMsg = errorPrefix +
         s"schema is incompatible (avroType = $avroType, sqlType = 
${catalystType.sql})"
 
+    val realDataType = SchemaConverters.toSqlType(avroType).dataType
+    val confKey = SQLConf.LEGACY_AVRO_ALLOW_READING_WITH_INCOMPATIBLE_SCHEMA
+    val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
+    def incorrectTypeException(provided: DataType): 
IncompatibleSchemaException = {
+      new IncompatibleSchemaException(errorPrefix + "the original encoded data 
type is " +
+        s"${realDataType.catalogString}, however you're trying to read the 
field as " +
+        s"${provided.catalogString}, which would lead to an incorrect answer. 
To allow reading " +
+        s"this field, enable the SQL configuration: `${confKey.key}`.")
+    }
+
+    def lowerPrecisionException(provided: DataType): 
IncompatibleSchemaException = {

Review Comment:
   sg!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to