baganokodo2022 commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043901516


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -40,19 +40,26 @@ object SchemaConverters {
    *
    * @since 3.4.0
    */
-  def toSqlType(descriptor: Descriptor): SchemaType = {
-    toSqlTypeHelper(descriptor)
+  def toSqlType(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): 
SchemaType = {
+    toSqlTypeHelper(descriptor, protobufOptions)
   }
 
-  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = 
ScalaReflectionLock.synchronized {
+  def toSqlTypeHelper(
+      descriptor: Descriptor,
+      protobufOptions: ProtobufOptions): SchemaType = 
ScalaReflectionLock.synchronized {
     SchemaType(
-      StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, 
Set.empty)).toArray),
+      StructType(descriptor.getFields.asScala.flatMap(
+        structFieldFor(_, Map.empty, Map.empty, protobufOptions: 
ProtobufOptions)).toArray),
       nullable = true)
   }
 
   def structFieldFor(
       fd: FieldDescriptor,
-      existingRecordNames: Set[String]): Option[StructField] = {
+      existingRecordNames: Map[String, Int],
+      existingRecordTypes: Map[String, Int],

Review Comment:
   @SandishKumarHN since it is going to be either `FIELD_NAME` or `FIELD_TYPE`, 
do we need keep both 2 Maps?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = 
parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   > 
   
   Yes @SandishKumarHN you are right. That is discovered from a very complex 
Proto schema shared across many micro services.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   @SandishKumarHN and @rangadi , should we error out on `-1` the default value 
unless users specifically override?
   0 -> drop all recursed fields once encountered
   1 -> allowed the same field name (type) to be entered twice.
   2 -> allowed the same field name (type) to be entered 3 timce.
   
   thoughts?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   In my back-ported branch,
   ```
           val recordName = circularReferenceType match {
             case CircularReferenceTypes.FIELD_NAME =>
               fd.getFullName
             case CircularReferenceTypes.FIELD_TYPE =>
               fd.getFullName().substring(0, fd.getFullName().lastIndexOf(".")) 
           }
           
           if (circularReferenceTolerance < 0 && 
existingRecordNames(recordName) > 0) {
             // no tolerance on circular reference
             logError(s"circular reference in protobuf schema detected [no 
tolerance] - ${recordName}")
             throw new IllegalStateException(s"circular reference in protobuf 
schema detected [no tolerance] - ${recordName}")
           }
   
           if (existingRecordNames(recordName) > (circularReferenceTolerance 
max 0) ) {
             // stop navigation and drop the repetitive field
             logInfo(s"circular reference in protobuf schema detected [max 
tolerance breached] field dropped - ${recordName} = 
${existingRecordNames(recordName)}")
             Some(NullType)
           } else {
             val newRecordNames: Map[String, Int] = existingRecordNames +  
               (recordName -> (1 + existingRecordNames(recordName)))
             Option(
               fd.getMessageType.getFields.asScala
                 .flatMap(structFieldFor(_, newRecordNames, protobufOptions))
                 .toSeq)
               .filter(_.nonEmpty)
               .map(StructType.apply)
           }```



##########
connector/protobuf/src/test/resources/protobuf/functions_suite.proto:
##########
@@ -170,4 +170,41 @@ message timeStampMsg {
 message durationMsg {
   string key = 1;
   Duration duration = 2;
-}
\ No newline at end of file
+}
+
+message OneOfEvent {

Review Comment:
   nice 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = 
parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   Hi @rangadi , under certain circumstances dropping fields with data seems 
inevitable when dealing with circular references. We can't tell which fields 
are intended to be kept. One example is the parent-child relationship in a RDB 
data model, considering IC -> EM -> EM2 -> Director -> Senior Director -> VP -> 
CTO -> CEO, which are all `Employee` type, assuming the relationship is 
bi-directional.  The longest path for `level-1` circular reference on 
`FIELD_NAME` is IC -> EM -> EM2 -> Director -> Senior Director -> VP -> CTO -> 
CEO -> CTO -> VP -> Senior Director -> Director -> EM2 -> EM -> IC. In reality, 
data scientists may just want to keep 2 levels of circular reference on 
`FIELD_TYPE` , IC -> EM -> EM2, or EM2 -> Director -> Senior Director. This 
greatly reduces redundant data in the warehouse.
   
   Hope it make sense
   
   Thanks
   Xinyu



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to