brkyvz commented on code in PR #41052:
URL: https://github.com/apache/spark/pull/41052#discussion_r1213796477


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -117,178 +119,260 @@ private[sql] class AvroDeserializer(
     val incompatibleMsg = errorPrefix +
         s"schema is incompatible (avroType = $avroType, sqlType = 
${catalystType.sql})"
 
-    (avroType.getType, catalystType) match {
-      case (NULL, NullType) => (updater, ordinal, _) =>
-        updater.setNullAt(ordinal)
+    val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
+    val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
 
+    val logicalDataType = SchemaConverters.toSqlType(avroType).dataType
+    avroType.getType match {
+      case NULL =>
+        (logicalDataType, catalystType) match {
+          case (_, NullType) => (updater, ordinal, _) =>
+            updater.setNullAt(ordinal)
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        }
       // TODO: we can avoid boxing if future version of avro provide primitive 
accessors.
-      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
-        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
-
-      case (INT, IntegerType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, value.asInstanceOf[Int])
-
-      case (INT, DateType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
-
-      case (LONG, LongType) => (updater, ordinal, value) =>
-        updater.setLong(ordinal, value.asInstanceOf[Long])
-
-      case (LONG, TimestampType) => avroType.getLogicalType match {
-        // For backward compatibility, if the Avro type is Long and it is not 
logical type
-        // (the `null` case), the value is processed as timestamp type with 
millisecond precision.
-        case null | _: TimestampMillis => (updater, ordinal, value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.millisToMicros(millis)
-          updater.setLong(ordinal, timestampRebaseFunc(micros))
-        case _: TimestampMicros => (updater, ordinal, value) =>
-          val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, timestampRebaseFunc(micros))
-        case other => throw new IncompatibleSchemaException(errorPrefix +
-          s"Avro logical type $other cannot be converted to SQL type 
${TimestampType.sql}.")
-      }
-
-      case (LONG, TimestampNTZType) => avroType.getLogicalType match {
-        // To keep consistent with TimestampType, if the Avro type is Long and 
it is not
-        // logical type (the `null` case), the value is processed as 
TimestampNTZ
-        // with millisecond precision.
-        case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.millisToMicros(millis)
-          updater.setLong(ordinal, micros)
-        case _: LocalTimestampMicros => (updater, ordinal, value) =>
-          val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, micros)
-        case other => throw new IncompatibleSchemaException(errorPrefix +
-          s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
-      }
-
-      // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
-      // For backward compatibility, we still keep this conversion.
-      case (LONG, DateType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, (value.asInstanceOf[Long] / 
MILLIS_PER_DAY).toInt)
-
-      case (FLOAT, FloatType) => (updater, ordinal, value) =>
-        updater.setFloat(ordinal, value.asInstanceOf[Float])
-
-      case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
-        updater.setDouble(ordinal, value.asInstanceOf[Double])
-
-      case (STRING, StringType) => (updater, ordinal, value) =>
-        val str = value match {
-          case s: String => UTF8String.fromString(s)
-          case s: Utf8 =>
-            val bytes = new Array[Byte](s.getByteLength)
-            System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
-            UTF8String.fromBytes(bytes)
+      case BOOLEAN =>
+        (logicalDataType, catalystType) match {
+          case (_, BooleanType) => (updater, ordinal, value) =>
+            updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
         }
-        updater.set(ordinal, str)
-
-      case (ENUM, StringType) => (updater, ordinal, value) =>
-        updater.set(ordinal, UTF8String.fromString(value.toString))
-
-      case (FIXED, BinaryType) => (updater, ordinal, value) =>
-        updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
-
-      case (BYTES, BinaryType) => (updater, ordinal, value) =>
-        val bytes = value match {
-          case b: ByteBuffer =>
-            val bytes = new Array[Byte](b.remaining)
-            b.get(bytes)
-            // Do not forget to reset the position
-            b.rewind()
-            bytes
-          case b: Array[Byte] => b
-          case other =>
-            throw new RuntimeException(errorPrefix + s"$other is not a valid 
avro binary.")
+      case INT =>
+        (logicalDataType, catalystType) match {
+          case (IntegerType, IntegerType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, value.asInstanceOf[Int])
+          case (IntegerType, DateType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+          case (DateType, DateType) => (updater, ordinal, value) =>
+            updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+          case (_: YearMonthIntervalType, _: YearMonthIntervalType) => 
(updater, ordinal, value) =>
+            updater.setInt(ordinal, value.asInstanceOf[Int])
+          case (_: YearMonthIntervalType, _) if preventReadingIncorrectType =>
+            throw QueryCompilationErrors.avroIncorrectTypeError(
+              toFieldStr(avroPath), toFieldStr(catalystPath),
+              logicalDataType.catalogString, catalystType.catalogString, 
confKey.key)
+          case _ if !preventReadingIncorrectType => (updater, ordinal, value) 
=>
+            updater.setInt(ordinal, value.asInstanceOf[Int])
+          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
         }
-        updater.set(ordinal, bytes)
-
-      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
-        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-        val bigDecimal = 
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
-        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
-        updater.setDecimal(ordinal, decimal)
-
-      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
-        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-        val bigDecimal = 
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
-        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
-        updater.setDecimal(ordinal, decimal)
-
-      case (RECORD, st: StructType) =>
-        // Avro datasource doesn't accept filters with nested attributes. See 
SPARK-32328.
-        // We can always return `false` from `applyFilters` for nested records.
-        val writeRecord =
-          getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = 
_ => false)
-        (updater, ordinal, value) =>
-          val row = new SpecificInternalRow(st)
-          writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
-          updater.set(ordinal, row)
-
-      case (ARRAY, ArrayType(elementType, containsNull)) =>
-        val avroElementPath = avroPath :+ "element"
-        val elementWriter = newWriter(avroType.getElementType, elementType,
-          avroElementPath, catalystPath :+ "element")
-        (updater, ordinal, value) =>
-          val collection = value.asInstanceOf[java.util.Collection[Any]]
-          val result = createArrayData(elementType, collection.size())
-          val elementUpdater = new ArrayDataUpdater(result)
-
-          var i = 0
-          val iter = collection.iterator()
-          while (iter.hasNext) {
-            val element = iter.next()
-            if (element == null) {
-              if (!containsNull) {
-                throw new RuntimeException(
-                  s"Array value at path ${toFieldStr(avroElementPath)} is not 
allowed to be null")
-              } else {
-                elementUpdater.setNullAt(i)
-              }
-            } else {
-              elementWriter(elementUpdater, i, element)
-            }
-            i += 1
+      case LONG =>
+        (logicalDataType, catalystType) match {
+          case (LongType, LongType) => (updater, ordinal, value) =>

Review Comment:
   Did we lose the DateType support here? It's mentioned above:
   ```
   // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
         // For backward compatibility, we still keep this conversion.
         case (LONG, DateType) => (updater, ordinal, value) =>
           updater.setInt(ordinal, (value.asInstanceOf[Long] / 
MILLIS_PER_DAY).toInt)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to