sadikovi commented on code in PR #42618:
URL: https://github.com/apache/spark/pull/42618#discussion_r1302356178
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -142,18 +143,30 @@ object SchemaConverters {
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive
call
val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
- if (remainingUnionTypes.size == 1) {
- toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames,
avroOptions)
- .copy(nullable = true)
- } else {
- toSqlTypeHelper(
- Schema.createUnion(remainingUnionTypes.asJava),
- existingRecordNames,
- avroOptions).copy(nullable = true)
- }
+ toSqlTypeHelper(
+ Schema.createUnion(remainingUnionTypes.asJava),
+ existingRecordNames,
+ avroOptions).copy(nullable = true)
} else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
case Seq(t1) =>
- toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames,
avroOptions)
+ // If spark.sql.avro.alwaysConvertUnionToStructType is set to
false (default),
+ // we convert Avro union with a single primitive type into a
primitive Spark type
+ // instead of a StructType.
+ if (!SQLConf.get.avroAlwaysConvertUnionToStruct) {
+ toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames,
avroOptions)
+ } else {
+ val singleton = avroSchema.getTypes.get(0)
+ val schemaType = toSqlTypeHelper(singleton, existingRecordNames,
avroOptions)
+ val fieldName = if (avroOptions.useStableIdForUnionType) {
+ s"member_${singleton.getName.toLowerCase(Locale.ROOT)}"
+ } else {
+ s"member0"
Review Comment:
Could you explore if there is a way to not duplicate the logic of stable
identifiers?
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -142,18 +143,30 @@ object SchemaConverters {
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive
call
val remainingUnionTypes = AvroUtils.nonNullUnionBranches(avroSchema)
- if (remainingUnionTypes.size == 1) {
- toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames,
avroOptions)
- .copy(nullable = true)
- } else {
- toSqlTypeHelper(
- Schema.createUnion(remainingUnionTypes.asJava),
- existingRecordNames,
- avroOptions).copy(nullable = true)
- }
+ toSqlTypeHelper(
+ Schema.createUnion(remainingUnionTypes.asJava),
+ existingRecordNames,
+ avroOptions).copy(nullable = true)
} else avroSchema.getTypes.asScala.map(_.getType).toSeq match {
case Seq(t1) =>
- toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames,
avroOptions)
+ // If spark.sql.avro.alwaysConvertUnionToStructType is set to
false (default),
+ // we convert Avro union with a single primitive type into a
primitive Spark type
+ // instead of a StructType.
+ if (!SQLConf.get.avroAlwaysConvertUnionToStruct) {
+ toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames,
avroOptions)
Review Comment:
I was going to suggest doing this for Avro schema converter in the Avro data
source where this is required because the top schema is always expected to be a
struct.
For example, we change this code in AvroUtils:
```scala
// Converts Avro schema to sql type and ensures that the top level data type
is either an Avro
// record or a complex union that both result in a conversion to StructType
def convertAvroToSqlSchema(avroSchema: Schema, avroOptions: AvroOptions):
StructType = {
SchemaConverters.toSqlTypeHelper(avroSchema, Set.empty,
avroOptions).dataType match {
case t: StructType => t
case _ => throw new RuntimeException(
s"""Avro schema cannot be converted to a Spark SQL StructType:
|
|${avroSchema.toString(true)}
|""".stripMargin)
}
}
```
to something like this:
```scala
// Converts Avro schema to sql type and ensures that the top level data type
is either an Avro
// record or a complex union that both result in a conversion to StructType
def convertAvroToSqlSchema(avroSchema: Schema, avroOptions: AvroOptions):
StructType = {
SchemaConverters.toSqlTypeHelper(avroSchema, Set.empty,
avroOptions).dataType match {
case t: StructType => t
case t: AtomicType => StructType(StructField("value", t, nullable =
true))
case _ => throw new RuntimeException(
s"""Avro schema cannot be converted to a Spark SQL StructType:
|
|${avroSchema.toString(true)}
|""".stripMargin)
}
}
```
Can we also check that this change will not affect `from_avro` SQL function?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]