rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1004757693


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+        throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()
+    for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) {
+      if (!fileDescriptorProtoIndex.contains(dependencyName)) {
+        throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName)
+      }
+      val dependencyProto: FileDescriptorProto = 
fileDescriptorProtoIndex.get(dependencyName).get
+      fileDescriptorList = fileDescriptorList ++ List(
+        buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex))
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   * @param fileDescriptorSet

Review Comment:
   Same for _param_ and _return_ here.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+        throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()
+    for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) {
+      if (!fileDescriptorProtoIndex.contains(dependencyName)) {
+        throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName)
+      }
+      val dependencyProto: FileDescriptorProto = 
fileDescriptorProtoIndex.get(dependencyName).get
+      fileDescriptorList = fileDescriptorList ++ List(
+        buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex))
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   * @param fileDescriptorSet
+   * @return Map[String, FileDescriptorProto]
+   */
+  private def createDescriptorProtoIndex(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    var resultBuilder = Map[String, FileDescriptorProto]()

Review Comment:
   style-nit : Similarly here here:
   
        fileDescriptorSet.getFileList().asScala.map {
              descriptorProto.getName() -> descriptorProto
         }.toMap()
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+        throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()

Review Comment:
   style nit: More scala idiomatic:
   
       val fileDescriptorList = 
fileDescriptorProto.getDependencyList().asScala.map { dependency => 
            fileDescriptorProtoIndex.get(dependency) match {
                case Some(dependencyProto) => 
                        buildFileDescriptor(dependencyProto, 
fileDescriptorProtoIndex)
                case None => 
                       throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
            }
       }
   
      
   
     
                    
   
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+        throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {

Review Comment:
   minor: Replace 'Index' with 'Map'?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+        throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto

Review Comment:
   nit: Can remove parameters here if we are not adding any description (given 
these are internal APIs).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to