zikangh commented on code in PR #53828:
URL: https://github.com/apache/spark/pull/53828#discussion_r2734711554
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -42,20 +42,23 @@ object SchemaConverters extends Logging {
*/
private[protobuf] def toSqlType(
descriptor: Descriptor,
+ fullNamesToExtensions: Map[String, Seq[FieldDescriptor]] = Map.empty,
Review Comment:
What about calling this `messageNameToExtensions` like you did in
ProtobufUtils?
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -232,38 +263,49 @@ private[sql] object ProtobufUtils extends Logging {
throw QueryCompilationErrors.descriptorParseError(ex)
}
val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+
+ // Mutated across invocations of buildFileDescriptor.
+ val builtDescriptors = mutable.Map[String, Descriptors.FileDescriptor]()
val fileDescriptorList: List[Descriptors.FileDescriptor] =
- fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
- buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
- ).toList
+ fileDescriptorSet.getFileList.asScala.map { fileDescriptorProto =>
+ buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex,
builtDescriptors)
+ }.distinctBy(_.getFullName).toList
fileDescriptorList
}
/**
- * Recursively constructs file descriptors for all dependencies for given
- * FileDescriptorProto and return.
+ * Recursively constructs file descriptors for all dependencies for given
FileDescriptorProto
+ * and return.
*/
private def buildFileDescriptor(
- fileDescriptorProto: FileDescriptorProto,
- fileDescriptorProtoMap: Map[String, FileDescriptorProto]):
Descriptors.FileDescriptor = {
- val fileDescriptorList =
fileDescriptorProto.getDependencyList().asScala.map { dependency =>
- fileDescriptorProtoMap.get(dependency) match {
- case Some(dependencyProto) =>
- if (dependencyProto.getName == "google/protobuf/any.proto"
- && dependencyProto.getPackage == "google.protobuf") {
- // For Any, use the descriptor already included as part of the
Java dependency.
- // Without this, JsonFormat used for converting Any fields fails
when
- // an Any field in input is set to `Any.getDefaultInstance()`.
- com.google.protobuf.AnyProto.getDescriptor
- // Should we do the same for timestamp.proto and empty.proto?
- } else {
- buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
- }
- case None =>
- throw
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
- }
- }
- Descriptors.FileDescriptor.buildFrom(fileDescriptorProto,
fileDescriptorList.toArray)
+ fileDescriptorProto: FileDescriptorProto,
+ fileDescriptorProtoMap: Map[String, FileDescriptorProto],
+ builtDescriptors: mutable.Map[String, Descriptors.FileDescriptor])
+ : Descriptors.FileDescriptor = {
+ // Storing references to constructed descriptors is crucial because
descriptors are compared
Review Comment:
This also fixes the cross-file issue for regular fields, right? Nice! Let's
add a test case for that? I would also call this out in the PR description.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -296,9 +338,59 @@ private[sql] object ProtobufUtils extends Logging {
}
/** Builds [[TypeRegistry]] with the descriptor and the others from the same
proto file. */
- private [protobuf] def buildTypeRegistry(descriptor: Descriptor):
TypeRegistry = {
- TypeRegistry.newBuilder()
+ private[protobuf] def buildTypeRegistry(descriptor: Descriptor):
TypeRegistry = {
+ TypeRegistry
+ .newBuilder()
.add(descriptor) // This adds any other descriptors in the associated
proto file.
.build()
}
+
+ /**
+ * Builds an ExtensionRegistry containing all extensions found in the
FileDescriptorSet. This
Review Comment:
... and messageNameToExtensions
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -208,21 +254,6 @@ private[sql] object ProtobufUtils extends Logging {
.asInstanceOf[Descriptor]
}
- def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName:
String): Descriptor = {
Review Comment:
Is it possible to keep this helper method? It's hard to figure out what
changed.
(I see that we have 2 buildDescriptor, which is confusing, we can maybe
rename this one).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]