SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1043814379


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.

Review Comment:
   @rangadi The user can specify the maximum allowed recursion depth for a 
field by setting the circularReferenceDepth property to 0, 1, or 2. Setting the 
circularReferenceDepth to 0 allows the field to be recursed once, setting it to 
1 allows it to be recursed twice, and setting it to 2 allows it to be recursed 
thrice. Attempting to set the circularReferenceDepth to a value greater than 2 
is not allowed. If the circularReferenceDepth is not specified, it will default 
to -1, which disables recursive fields.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  val circularReferenceType: String = 
parameters.getOrElse("circularReferenceType", "FIELD_NAME")

Review Comment:
   @rangadi we already know about field_name circusive check. using 
`fd.getFullName` we detect the recursion and throw and error.  another option 
is to detect recursion through field type. example below.
   
   ```
   message A {
   B b;
   }
   
   message B {
   A c;
   }
   ```
   in the case of field_name recursive check it is ```A.B.C```  no recursion. 
   in the case of field_type recursive check. it is 
```MESSAGE.MESSAGE.MESSAGE``` recursion will be found and throw an error or 
drop the certain recursive depth. 
   but it will also throw an error for the below case with the field_type 
check. since it will be ```MESSAGE.MESSAGE.MESSAGE.MESSAGE```
   
   ```
   message A {
   B b = 1;
   }
   
   message B {
   D d = 1;
   }
   
   message D {
   E e = 1;
   }
   
   message E {
   int32 key = 1;
   }
   ```
   @baganokodo2022 argument is field_type base check will give users an option 
to drop recursion more quickly because with complex nested schema recursive 
field_name can be found at very deep. before hitting this we might see OOM. 
field_type base check finds the circle reference more quickly. 
   
   @baganokodo2022 please correct me if I'm wrong. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -92,14 +109,38 @@ object SchemaConverters {
             MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
             nullable = false))
       case MESSAGE =>
-        if (existingRecordNames.contains(fd.getFullName)) {
-          throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+        // User can set circularReferenceDepth of 0 or 1 or 2.
+        // Going beyond 3 levels of recursion is not allowed.
+        if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) {
+          if (existingRecordTypes.contains(fd.getType.name()) &&
+            (protobufOptions.circularReferenceDepth < 0 ||
+              protobufOptions.circularReferenceDepth >= 3)) {
+            throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+          } else if (existingRecordTypes.contains(fd.getType.name()) &&

Review Comment:
   @rangadi we have two maps with incremental counters, one for field_name base 
check and one for field_type. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage}
 import org.apache.spark.sql.{Column, QueryTest, Row}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions.{lit, struct}
-import 
org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
+import 
org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, 
EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated}

Review Comment:
   @rangadi yes, 
   `Handle recursive fields in Protobuf schema, C->D->Array(C)` and
   `Handle recursive fields in Protobuf schema, A->B->A`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to