xkrogen commented on a change in pull request #31333:
URL: https://github.com/apache/spark/pull/31333#discussion_r568782744



##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
##########
@@ -64,30 +65,35 @@ private[sql] class AvroDeserializer(
   private val timestampRebaseFunc = 
DataSourceUtils.creteTimestampRebaseFuncInRead(
     datetimeRebaseMode, "Avro")
 
-  private val converter: Any => Option[Any] = rootCatalystType match {
-    // A shortcut for empty schema.
-    case st: StructType if st.isEmpty =>
-      (data: Any) => Some(InternalRow.empty)
-
-    case st: StructType =>
-      val resultRow = new SpecificInternalRow(st.map(_.dataType))
-      val fieldUpdater = new RowUpdater(resultRow)
-      val applyFilters = filters.skipRow(resultRow, _)
-      val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters)
-      (data: Any) => {
-        val record = data.asInstanceOf[GenericRecord]
-        val skipRow = writer(fieldUpdater, record)
-        if (skipRow) None else Some(resultRow)
-      }
+  private val converter: Any => Option[Any] = try {
+    rootCatalystType match {
+      // A shortcut for empty schema.
+      case st: StructType if st.isEmpty =>
+        (_: Any) => Some(InternalRow.empty)
+
+      case st: StructType =>
+        val resultRow = new SpecificInternalRow(st.map(_.dataType))
+        val fieldUpdater = new RowUpdater(resultRow)
+        val applyFilters = filters.skipRow(resultRow, _)
+        val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
+        (data: Any) => {
+          val record = data.asInstanceOf[GenericRecord]
+          val skipRow = writer(fieldUpdater, record)
+          if (skipRow) None else Some(resultRow)
+        }
 
-    case _ =>
-      val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
-      val fieldUpdater = new RowUpdater(tmpRow)
-      val writer = newWriter(rootAvroType, rootCatalystType, Nil)
-      (data: Any) => {
-        writer(fieldUpdater, 0, data)
-        Some(tmpRow.get(0, rootCatalystType))
-      }
+      case _ =>
+        val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
+        val fieldUpdater = new RowUpdater(tmpRow)
+        val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
+        (data: Any) => {
+          writer(fieldUpdater, 0, data)
+          Some(tmpRow.get(0, rootCatalystType))
+        }
+    }
+  } catch {
+    case ise: IncompatibleSchemaException => throw new 
IncompatibleSchemaException(
+      s"Cannot convert Avro type $rootAvroType to Catalyst type 
$rootCatalystType.", ise)

Review comment:
       Thanks for the tip, this is much better! 👍🏼




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to