WweiL commented on code in PR #47425:
URL: https://github.com/apache/spark/pull/47425#discussion_r1689265340
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -170,4 +173,23 @@ private[sql] object AvroOptions extends DataSourceOptions {
// When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure
the prefix for fields
// of Avro Union type.
val STABLE_ID_PREFIX_FOR_UNION_TYPE =
newOption("stableIdentifierPrefixForUnionType")
+
+ /**
+ * Adds support for recursive fields. If this option is not specified or is
set to 0, recursive
+ * fields are not permitted. Setting it to 1 drops all recursive fields, 2
allows recursive
+ * fields to be recursed once, and 3 allows it to be recursed twice and so
on, up to 15.
+ * Values larger than 15 are not allowed in order avoid inadvertently
creating very large schemas.
Review Comment:
In order to?
##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala:
##########
@@ -2252,6 +2252,151 @@ abstract class AvroSuite
""".stripMargin)
}
+ private def checkSparkSchemaEquals(
+ avroSchema: String, expectedSchema: StructType, recursiveFieldMaxDepth:
Int): Unit = {
+ val sparkSchema =
+ SchemaConverters.toSqlType(
+ new Schema.Parser().parse(avroSchema), false, "",
recursiveFieldMaxDepth).dataType
+
+ assert(sparkSchema === expectedSchema)
+ }
+
+ test("Translate recursive schema - 1") {
+ val avroSchema = """
+ |{
+ | "type": "record",
+ | "name": "LongList",
+ | "fields" : [
+ | {"name": "value", "type": "long"}, // each element has
a long
+ | {"name": "next", "type": ["null", "LongList"]} // optional next
element
+ | ]
+ |}
+ """.stripMargin
+ val nonRecursiveFields = new StructType().add("value", LongType, nullable
= false)
+ var expectedSchema = nonRecursiveFields
+ for (i <- 1 to 5) {
+ checkSparkSchemaEquals(avroSchema, expectedSchema, i)
+ expectedSchema = nonRecursiveFields.add("next", expectedSchema)
+ }
+ }
+
+ test("Translate recursive schema - 2") {
+ val avroSchema = """
+ |{
+ | "type": "record",
+ | "name": "LongList",
+ | "fields": [
+ | {
+ | "name": "value",
+ | "type": {
+ | "type": "record",
+ | "name": "foo",
+ | "fields": [
+ | {
+ | "name": "parent",
+ | "type": "LongList"
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}
+ """.stripMargin
+ val nonRecursiveFields = new StructType().add("value", StructType(Seq()),
nullable = false)
+ var expectedSchema = nonRecursiveFields
+ for (i <- 1 to 5) {
+ checkSparkSchemaEquals(avroSchema, expectedSchema, i)
+ expectedSchema = new StructType().add("value",
+ new StructType().add("parent", expectedSchema, nullable = false),
nullable = false)
+ }
+ }
+
+ test("Translate recursive schema - 3") {
+ val avroSchema = """
+ |{
+ | "type": "record",
+ | "name": "LongList",
+ | "fields" : [
+ | {"name": "value", "type": "long"},
+ | {"name": "array", "type": {"type": "array", "items": "LongList"}}
+ | ]
+ |}
+ """.stripMargin
+ val nonRecursiveFields = new StructType().add("value", LongType, nullable
= false)
+ var expectedSchema = nonRecursiveFields
+ for (i <- 1 to 5) {
+ checkSparkSchemaEquals(avroSchema, expectedSchema, i)
+ expectedSchema =
+ nonRecursiveFields.add("array", new ArrayType(expectedSchema, false),
nullable = false)
+ }
+ }
+
+ test("Translate recursive schema - 4") {
Review Comment:
Can we have better names for the test?
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -128,62 +135,109 @@ object SchemaConverters {
case NULL => SchemaType(NullType, nullable = true)
case RECORD =>
- if (existingRecordNames.contains(avroSchema.getFullName)) {
+ val recursiveDepth: Int =
existingRecordNames.getOrElse(avroSchema.getFullName, 0)
+ if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 ||
recursiveFieldMaxDepth > 15)) {
throw new IncompatibleSchemaException(s"""
- |Found recursive reference in Avro schema, which can not be
processed by Spark:
- |${avroSchema.toString(true)}
+ |Found recursive reference in Avro schema, which can not be
processed by Spark by
Review Comment:
IIRC protobuf does similar here. But this logic looks a bit weird. If we do
want to limit the max recursive depth, I feel that it should be checked in the
option and throw `IllegalArgumentException`
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -128,62 +135,109 @@ object SchemaConverters {
case NULL => SchemaType(NullType, nullable = true)
case RECORD =>
- if (existingRecordNames.contains(avroSchema.getFullName)) {
+ val recursiveDepth: Int =
existingRecordNames.getOrElse(avroSchema.getFullName, 0)
+ if (recursiveDepth > 0 && (recursiveFieldMaxDepth <= 0 ||
recursiveFieldMaxDepth > 15)) {
throw new IncompatibleSchemaException(s"""
- |Found recursive reference in Avro schema, which can not be
processed by Spark:
- |${avroSchema.toString(true)}
+ |Found recursive reference in Avro schema, which can not be
processed by Spark by
+ | default: ${avroSchema.toString(true)}. Try setting the option
`recursiveFieldMaxDepth`
+ | to 1 - 15. Going beyond 15 levels of recursion is not allowed.
""".stripMargin)
- }
- val newRecordNames = existingRecordNames + avroSchema.getFullName
- val fields = avroSchema.getFields.asScala.map { f =>
- val schemaType = toSqlTypeHelper(
- f.schema(),
- newRecordNames,
- useStableIdForUnionType,
- stableIdPrefixForUnionType)
- StructField(f.name, schemaType.dataType, schemaType.nullable)
- }
+ } else if (recursiveDepth > 0 && recursiveDepth >=
recursiveFieldMaxDepth) {
+ log.info(
Review Comment:
Please try to use the new MDC logging, example:
https://github.com/apache/spark/pull/46192
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -170,4 +173,23 @@ private[sql] object AvroOptions extends DataSourceOptions {
// When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure
the prefix for fields
// of Avro Union type.
val STABLE_ID_PREFIX_FOR_UNION_TYPE =
newOption("stableIdentifierPrefixForUnionType")
+
+ /**
+ * Adds support for recursive fields. If this option is not specified or is
set to 0, recursive
+ * fields are not permitted. Setting it to 1 drops all recursive fields, 2
allows recursive
+ * fields to be recursed once, and 3 allows it to be recursed twice and so
on, up to 15.
+ * Values larger than 15 are not allowed in order avoid inadvertently
creating very large schemas.
Review Comment:
I feel this should be a spark conf
##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##########
@@ -170,4 +173,23 @@ private[sql] object AvroOptions extends DataSourceOptions {
// When STABLE_ID_FOR_UNION_TYPE is enabled, the option allows to configure
the prefix for fields
// of Avro Union type.
val STABLE_ID_PREFIX_FOR_UNION_TYPE =
newOption("stableIdentifierPrefixForUnionType")
+
+ /**
+ * Adds support for recursive fields. If this option is not specified or is
set to 0, recursive
+ * fields are not permitted. Setting it to 1 drops all recursive fields, 2
allows recursive
+ * fields to be recursed once, and 3 allows it to be recursed twice and so
on, up to 15.
+ * Values larger than 15 are not allowed in order avoid inadvertently
creating very large schemas.
Review Comment:
Does protobuf also have max depth of 15?
##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala:
##########
@@ -228,7 +228,8 @@ object AvroSerdeSuite {
RebaseSpec(CORRECTED),
new NoopFilters,
false,
- "")
+ "",
+ -1)
Review Comment:
Should we just add a default value in the definition to prevent multiple API
change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]