xkrogen commented on a change in pull request #31133:
URL: https://github.com/apache/spark/pull/31133#discussion_r558485200
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
##########
@@ -1883,6 +1883,60 @@ class HiveDDLSuite
}
}
+ test("SPARK-26836: support Avro schema evolution") {
+ withTable("t") {
+ val originalSchema =
+ """
+ |{
+ | "namespace": "test",
+ | "name": "some_schema",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "col2",
+ | "type": "string"
+ | }
+ | ]
+ |}
+ """.stripMargin
+ sql(
+ s"""
+ |CREATE TABLE t PARTITIONED BY (ds string)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+ |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
+ |STORED AS
+ |INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+ """.stripMargin)
+ sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
+ val evolvedSchema =
+ """
+ |{
+ | "namespace": "test",
+ | "name": "some_schema",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "col1",
+ | "type": "string",
+ | "default": "col1_default"
+ | },
+ | {
+ | "name": "col2",
+ | "type": "string"
+ | }
+ | ]
+ |}
+ """.stripMargin
+ sql(s"""ALTER TABLE t SET SERDEPROPERTIES
('avro.schema.literal'='$evolvedSchema')""")
+ sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value',
'col2_value')")
+ withSQLConf(SQLConf.HIVE_AVRO_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
+ checkAnswer(spark.table("t"), Row("col1_default", "col2_value",
"1981-01-07")
+ :: Row("col1_value", "col2_value", "1983-04-27") :: Nil)
+ }
Review comment:
Maybe also `checkAnswer` without the new schema evolution config enabled?
##########
File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
##########
@@ -248,11 +250,17 @@ class HadoopTableReader(
// SPARK-13709: For SerDes like AvroSerDe, some essential information
(e.g. Avro schema
// information) may be defined in table properties. Here we should
merge table properties
// and partition properties before initializing the deserializer. Note
that partition
- // properties take a higher priority here. For example, a partition
may have a different
- // SerDe as the one defined in table properties.
+ // properties take a higher priority here except for the Avro table
properties when
+ // "spark.sql.hive.avroSchemaEvolution.enabled" is set because in that
case the properties
+ // given at table level will be used (for details please check
SPARK-26836).
+ // For example, a partition may have a different SerDe as the one
defined in table
+ // properties.
val props = new Properties(tableProperties)
- partProps.asScala.foreach {
- case (key, value) => props.setProperty(key, value)
+ partProps.asScala.foreach { case (key, value) =>
+ if (!avroSchemaEvolutionEnabled ||
+ !avroTablePropertyKeys.contains(key) ||
!tableProperties.containsKey(key)) {
Review comment:
Can we rephrase this as `!(avroSchemaEvolutionEnabled &&
avroTablePropertyKeys.contains(key) && tableProperties.containsKey(key))` ? I
think the disjunction of negations is more confusing.
You can also consider something like:
```
partProps.asScala
.filterNot { case (key, _) => avroSchemaEvolutionEnabled &&
avroTablePropertyKeys.contains(key) && ... }
.foreach { case (key, value) => props.setProperty(key, value)
```
but I'm not sure if that is more readable or not.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]