baganokodo2022 commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043901516
########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ########## @@ -40,19 +40,26 @@ object SchemaConverters { * * @since 3.4.0 */ - def toSqlType(descriptor: Descriptor): SchemaType = { - toSqlTypeHelper(descriptor) + def toSqlType( + descriptor: Descriptor, + protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = { + toSqlTypeHelper(descriptor, protobufOptions) } - def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized { + def toSqlTypeHelper( + descriptor: Descriptor, + protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized { SchemaType( - StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray), + StructType(descriptor.getFields.asScala.flatMap( + structFieldFor(_, Map.empty, Map.empty, protobufOptions: ProtobufOptions)).toArray), nullable = true) } def structFieldFor( fd: FieldDescriptor, - existingRecordNames: Set[String]): Option[StructField] = { + existingRecordNames: Map[String, Int], + existingRecordTypes: Map[String, Int], Review Comment: @SandishKumarHN since it is going to be either `FIELD_NAME` or `FIELD_TYPE`, do we need keep both 2 Maps? ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ########## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > Yes @SandishKumarHN you are right. That is discovered from a very complex Proto schema shared across many micro services. ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ########## @@ -92,14 +109,38 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => - if (existingRecordNames.contains(fd.getFullName)) { - throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) + // User can set circularReferenceDepth of 0 or 1 or 2. + // Going beyond 3 levels of recursion is not allowed. + if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) { + if (existingRecordTypes.contains(fd.getType.name()) && + (protobufOptions.circularReferenceDepth < 0 || + protobufOptions.circularReferenceDepth >= 3)) { + throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) + } else if (existingRecordTypes.contains(fd.getType.name()) && Review Comment: @SandishKumarHN and @rangadi , should we error out on `-1` the default value unless users specifically override? 0 -> drop all recursed fields once encountered 1 -> allowed the same field name (type) to be entered twice. 2 -> allowed the same field name (type) to be entered 3 timce. thoughts? ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ########## @@ -92,14 +109,38 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => - if (existingRecordNames.contains(fd.getFullName)) { - throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) + // User can set circularReferenceDepth of 0 or 1 or 2. + // Going beyond 3 levels of recursion is not allowed. + if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) { + if (existingRecordTypes.contains(fd.getType.name()) && + (protobufOptions.circularReferenceDepth < 0 || + protobufOptions.circularReferenceDepth >= 3)) { + throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) + } else if (existingRecordTypes.contains(fd.getType.name()) && Review Comment: In my back-ported branch, ``` val recordName = circularReferenceType match { case CircularReferenceTypes.FIELD_NAME => fd.getFullName case CircularReferenceTypes.FIELD_TYPE => fd.getFullName().substring(0, fd.getFullName().lastIndexOf(".")) } if (circularReferenceTolerance < 0 && existingRecordNames(recordName) > 0) { // no tolerance on circular reference logError(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}") throw new IllegalStateException(s"circular reference in protobuf schema detected [no tolerance] - ${recordName}") } if (existingRecordNames(recordName) > (circularReferenceTolerance max 0) ) { // stop navigation and drop the repetitive field logInfo(s"circular reference in protobuf schema detected [max tolerance breached] field dropped - ${recordName} = ${existingRecordNames(recordName)}") Some(NullType) } else { val newRecordNames: Map[String, Int] = existingRecordNames + (recordName -> (1 + existingRecordNames(recordName))) Option( fd.getMessageType.getFields.asScala .flatMap(structFieldFor(_, newRecordNames, protobufOptions)) .toSeq) .filter(_.nonEmpty) .map(StructType.apply) }``` ########## connector/protobuf/src/test/resources/protobuf/functions_suite.proto: ########## @@ -170,4 +170,41 @@ message timeStampMsg { message durationMsg { string key = 1; Duration duration = 2; -} \ No newline at end of file +} + +message OneOfEvent { Review Comment: nice ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ########## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: Hi @rangadi , under certain circumstances dropping fields with data seems inevitable when dealing with circular references. We can't tell which fields are intended to be kept. One example is the parent-child relationship in a RDB data model, considering IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> CEO, which are all `Employee` type, assuming the relationship is bi-directional. The longest path for `level-1` circular reference on `FIELD_NAME` is IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> CEO -> CTO -> VP -> Senior Director -> Director -> EM2 -> EM -> IC. In reality, data scientists may just want to keep 2 levels of circular reference on `FIELD_TYPE` , IC -> EM -> EM2, or EM2 -> Director -> Senior Director. This greatly reduces redundant data in the warehouse. Hope it make sense Thanks Xinyu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org