rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1004757693
########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ########## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => - // TODO move all the exceptions to core/src/main/resources/error/error-classes.json - throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) + throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => - throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + - descFilePath, - ex) + throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - - val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( - descriptorProto, - new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = + buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { - throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => - throw new RuntimeException("Error constructing FileDescriptor", e) + throw QueryCompilationErrors.failedParsingDescriptorError(e) + } + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + * @param descriptorProto + * @param descriptorProtoIndex + * @return Descriptors.FileDescriptor + */ + private def buildFileDescriptor( + fileDescriptorProto: FileDescriptorProto, + fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { + var fileDescriptorList = List[Descriptors.FileDescriptor]() + for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) { + if (!fileDescriptorProtoIndex.contains(dependencyName)) { + throw QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName) + } + val dependencyProto: FileDescriptorProto = fileDescriptorProtoIndex.get(dependencyName).get + fileDescriptorList = fileDescriptorList ++ List( + buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex)) + } + Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + * @param fileDescriptorSet Review Comment: Same for _param_ and _return_ here. ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ########## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => - // TODO move all the exceptions to core/src/main/resources/error/error-classes.json - throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) + throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => - throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + - descFilePath, - ex) + throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - - val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( - descriptorProto, - new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = + buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { - throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => - throw new RuntimeException("Error constructing FileDescriptor", e) + throw QueryCompilationErrors.failedParsingDescriptorError(e) + } + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + * @param descriptorProto + * @param descriptorProtoIndex + * @return Descriptors.FileDescriptor + */ + private def buildFileDescriptor( + fileDescriptorProto: FileDescriptorProto, + fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { + var fileDescriptorList = List[Descriptors.FileDescriptor]() + for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) { + if (!fileDescriptorProtoIndex.contains(dependencyName)) { + throw QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName) + } + val dependencyProto: FileDescriptorProto = fileDescriptorProtoIndex.get(dependencyName).get + fileDescriptorList = fileDescriptorList ++ List( + buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex)) + } + Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + * @param fileDescriptorSet + * @return Map[String, FileDescriptorProto] + */ + private def createDescriptorProtoIndex( + fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = { + var resultBuilder = Map[String, FileDescriptorProto]() Review Comment: style-nit : Similarly here here: fileDescriptorSet.getFileList().asScala.map { descriptorProto.getName() -> descriptorProto }.toMap() ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ########## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => - // TODO move all the exceptions to core/src/main/resources/error/error-classes.json - throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) + throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => - throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + - descFilePath, - ex) + throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - - val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( - descriptorProto, - new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = + buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { - throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => - throw new RuntimeException("Error constructing FileDescriptor", e) + throw QueryCompilationErrors.failedParsingDescriptorError(e) + } + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + * @param descriptorProto + * @param descriptorProtoIndex + * @return Descriptors.FileDescriptor + */ + private def buildFileDescriptor( + fileDescriptorProto: FileDescriptorProto, + fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { + var fileDescriptorList = List[Descriptors.FileDescriptor]() Review Comment: style nit: More scala idiomatic: val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency => fileDescriptorProtoIndex.get(dependency) match { case Some(dependencyProto) => buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex) case None => throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency) } } ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ########## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => - // TODO move all the exceptions to core/src/main/resources/error/error-classes.json - throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) + throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => - throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + - descFilePath, - ex) + throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - - val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( - descriptorProto, - new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = + buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { - throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => - throw new RuntimeException("Error constructing FileDescriptor", e) + throw QueryCompilationErrors.failedParsingDescriptorError(e) + } + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + * @param descriptorProto + * @param descriptorProtoIndex + * @return Descriptors.FileDescriptor + */ + private def buildFileDescriptor( + fileDescriptorProto: FileDescriptorProto, + fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { Review Comment: minor: Replace 'Index' with 'Map'? ########## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ########## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => - // TODO move all the exceptions to core/src/main/resources/error/error-classes.json - throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) + throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => - throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + - descFilePath, - ex) + throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - - val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( - descriptorProto, - new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = + buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { - throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => - throw new RuntimeException("Error constructing FileDescriptor", e) + throw QueryCompilationErrors.failedParsingDescriptorError(e) + } + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + * @param descriptorProto Review Comment: nit: Can remove parameters here if we are not adding any description (given these are internal APIs). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org