hkulyc commented on code in PR #47425:
URL: https://github.com/apache/spark/pull/47425#discussion_r1720363091
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -128,62 +138,110 @@ object SchemaConverters {
case NULL => SchemaType(NullType, nullable = true)
case RECORD =>
- if (existingRecordNames.contains(avroSchema.getFullName)) {
+ val recursiveDepth: Int =
existingRecordNames.getOrElse(avroSchema.getFullName, 0)
+ if (recursiveDepth > 0 && recursiveFieldMaxDepth <= 0) {
throw new IncompatibleSchemaException(s"""
- |Found recursive reference in Avro schema, which can not be
processed by Spark:
- |${avroSchema.toString(true)}
+ |Found recursive reference in Avro schema, which can not be
processed by Spark by
+ | default: ${avroSchema.toString(true)}. Try setting the option
`recursiveFieldMaxDepth`
+ | to 1 - $RECURSIVE_FIELD_MAX_DEPTH_LIMIT.
""".stripMargin)
- }
- val newRecordNames = existingRecordNames + avroSchema.getFullName
- val fields = avroSchema.getFields.asScala.map { f =>
- val schemaType = toSqlTypeHelper(
- f.schema(),
- newRecordNames,
- useStableIdForUnionType,
- stableIdPrefixForUnionType)
- StructField(f.name, schemaType.dataType, schemaType.nullable)
- }
+ } else if (recursiveDepth > 0 && recursiveDepth >=
recursiveFieldMaxDepth) {
+ logInfo(
+ log"The field ${MDC(FIELD_NAME, avroSchema.getFullName)} of type "
+
+ log"${MDC(FIELD_TYPE, avroSchema.getType.getName)} is dropped at
recursive depth " +
+ log"${MDC(RECURSIVE_DEPTH, recursiveDepth)}."
+ )
+ null
+ } else {
+ val newRecordNames =
+ existingRecordNames + (avroSchema.getFullName -> (recursiveDepth +
1))
+ val fields = avroSchema.getFields.asScala.map { f =>
+ val schemaType = toSqlTypeHelper(
+ f.schema(),
+ newRecordNames,
+ useStableIdForUnionType,
+ stableIdPrefixForUnionType,
+ recursiveFieldMaxDepth)
+ if (schemaType == null) {
+ null
+ }
+ else {
+ StructField(f.name, schemaType.dataType, schemaType.nullable)
+ }
+ }.filter(_ != null).toSeq
Review Comment:
We need to drop them because StructType does not take an array that has null
values. One alternative is to wrap the null value with StructField(...,
nullable = true), but we are not doing it for Protobuf. In Protobuf, we just
directly drop them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]