rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006128712


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+        throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+    val fileDescriptorList = 
fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   Btw, What happens if there are two different files with the same name? E.g. 
   `protoc ... a/b/events.proto x/y/events.proto`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to