zikangh commented on code in PR #53828:
URL: https://github.com/apache/spark/pull/53828#discussion_r2734711554


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -42,20 +42,23 @@ object SchemaConverters extends Logging {
    */
   private[protobuf] def toSqlType(
       descriptor: Descriptor,
+      fullNamesToExtensions: Map[String, Seq[FieldDescriptor]] = Map.empty,

Review Comment:
   What about calling this `messageNameToExtensions` like you did in 
ProtobufUtils? 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -232,38 +263,49 @@ private[sql] object ProtobufUtils extends Logging {
         throw QueryCompilationErrors.descriptorParseError(ex)
     }
     val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+
+    // Mutated across invocations of buildFileDescriptor.
+    val builtDescriptors = mutable.Map[String, Descriptors.FileDescriptor]()
     val fileDescriptorList: List[Descriptors.FileDescriptor] =
-      fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
-        buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
-      ).toList
+      fileDescriptorSet.getFileList.asScala.map { fileDescriptorProto =>
+        buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex, 
builtDescriptors)
+      }.distinctBy(_.getFullName).toList
     fileDescriptorList
   }
 
   /**
-   * Recursively constructs file descriptors for all dependencies for given
-   * FileDescriptorProto and return.
+   * Recursively constructs file descriptors for all dependencies for given 
FileDescriptorProto
+   * and return.
    */
   private def buildFileDescriptor(
-    fileDescriptorProto: FileDescriptorProto,
-    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
-    val fileDescriptorList = 
fileDescriptorProto.getDependencyList().asScala.map { dependency =>
-      fileDescriptorProtoMap.get(dependency) match {
-        case Some(dependencyProto) =>
-          if (dependencyProto.getName == "google/protobuf/any.proto"
-            && dependencyProto.getPackage == "google.protobuf") {
-            // For Any, use the descriptor already included as part of the 
Java dependency.
-            // Without this, JsonFormat used for converting Any fields fails 
when
-            // an Any field in input is set to `Any.getDefaultInstance()`.
-            com.google.protobuf.AnyProto.getDescriptor
-            // Should we do the same for timestamp.proto and empty.proto?
-          } else {
-            buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
-          }
-        case None =>
-          throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
-      }
-    }
-    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+      fileDescriptorProto: FileDescriptorProto,
+      fileDescriptorProtoMap: Map[String, FileDescriptorProto],
+      builtDescriptors: mutable.Map[String, Descriptors.FileDescriptor])
+      : Descriptors.FileDescriptor = {
+    // Storing references to constructed descriptors is crucial because 
descriptors are compared

Review Comment:
   This also fixes the cross-file issue for regular fields, right? Nice! Let's 
add a test case for that? I would also call this out in the PR description. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -296,9 +338,59 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   /** Builds [[TypeRegistry]] with the descriptor and the others from the same 
proto file. */
-  private [protobuf] def buildTypeRegistry(descriptor: Descriptor): 
TypeRegistry = {
-    TypeRegistry.newBuilder()
+  private[protobuf] def buildTypeRegistry(descriptor: Descriptor): 
TypeRegistry = {
+    TypeRegistry
+      .newBuilder()
       .add(descriptor) // This adds any other descriptors in the associated 
proto file.
       .build()
   }
+
+  /**
+   * Builds an ExtensionRegistry containing all extensions found in the 
FileDescriptorSet. This

Review Comment:
   ... and messageNameToExtensions



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -208,21 +254,6 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
-  def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: 
String): Descriptor = {

Review Comment:
   Is it possible to keep this helper method? It's hard to figure out what 
changed.
   (I see that we have 2 buildDescriptor, which is confusing, we can maybe 
rename this one). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to