zeruibao commented on code in PR #41521:
URL: https://github.com/apache/spark/pull/41521#discussion_r1224523551


##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -160,6 +160,14 @@ private[sql] class AvroDeserializer(
         (logicalDataType, catalystType) match {
           case (LongType, LongType) => (updater, ordinal, value) =>
             updater.setLong(ordinal, value.asInstanceOf[Long])
+          case (_, LongType) => avroType.getLogicalType match {
+            case _: TimestampMicros | _: TimestampMillis |

Review Comment:
   I just follow the behavior of this test suite in runtime repo
   ```
   Seq(
       "time-millis",
       "time-micros",
       "timestamp-micros",
       "local-timestamp-millis",
       "local-timestamp-micros"
     ).foreach { timeLogicalType =>
       val timestampSchema = timestampSchemaTemplate.format(timeLogicalType)
   
       if (DatabricksEdgeConfigs.edgeModeEnabled) {
         test(s"$timeLogicalType type write support of Avro Hive serde") {
           Seq(true, false).foreach { useNativeAvro =>
             withSQLConf(DatabricksSQLConf.CONVERT_METASTORE_AVRO, 
useNativeAvro) {
               val tableName = "tab1"
   
               withTable(tableName) {
                 // Creates the (non-)partitioned Avro table
                 versionSpark.sql(
                   s"""
                      |CREATE TABLE $tableName
                      |ROW FORMAT SERDE 
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
                      |STORED AS
                      |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
                      |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
                      |TBLPROPERTIES ('avro.schema.literal' = 
'$timestampSchema')
               """.stripMargin
                 )
   
                 val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 
1000"
                 val result = versionSpark.sql(s"SELECT 1000").collect()
                 versionSpark.sql(insertStmt)
                 assert(versionSpark.table(tableName).collect() === result)
               }
             }
           }
         }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to