[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1010672856 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -178,46 +176,73 @@ private[sql] object ProtobufUtils extends Logging { } def buildDescriptor(descFilePath: String, messageName: String): Descriptor = { -val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc => - desc.getName == messageName || desc.getFullName == messageName -} +val fileDescriptor = parseFileDescriptorSet(descFilePath) + .find(!_.getMessageTypes.asScala.find(desc => Review Comment: Oh, we need to find() invocations and check is made twice. How about this? [Optional to do, only a suggestion] // Find the first message descriptor that matches the name. val descriptorOpt = parseFileDescriptorSet(descFilePath) .flatMap { fileDesc => fileDesc.getMessageTypes.asScala.find { desc => desc.getName == messageName || desc.getFullName == messageName } }.headOption descriptoOpt match { case Some(d) => d case None => throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName) } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006324273 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging { .asInstanceOf[Descriptor] } + // TODO: Revisit to ensure that messageName is searched through all imports Review Comment: What is missing? Looks fairly complete to me. Better to state the problem here. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging { .asInstanceOf[Descriptor] } + // TODO: Revisit to ensure that messageName is searched through all imports def buildDescriptor(descFilePath: String, messageName: String): Descriptor = { -val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc => - desc.getName == messageName || desc.getFullName == messageName +val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => { + fileDescriptor.getMessageTypes.asScala.find { desc => +desc.getName == messageName || desc.getFullName == messageName + } +}).filter(f => !f.isEmpty) + +if (descriptorList.isEmpty) { + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName) } -descriptor match { +descriptorList.last match { case Some(d) => d case None => -throw new RuntimeException(s"Unable to locate Message '$messageName' in Descriptor") +throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName) } } - private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = { + private def parseFileDescriptor(descFilePath: String): List[Descriptors.FileDescriptor] = { Review Comment: Rename to `parseFileDescriptorSet` (otherwise it sounds like it is parsing just one file descriptor). ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging { .asInstanceOf[Descriptor] } + // TODO: Revisit to ensure that messageName is searched through all imports def buildDescriptor(descFilePath: String, messageName: String): Descriptor = { -val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc => - desc.getName == messageName || desc.getFullName == messageName +val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => { + fileDescriptor.getMessageTypes.asScala.find { desc => +desc.getName == messageName || desc.getFullName == messageName + } +}).filter(f => !f.isEmpty) + +if (descriptorList.isEmpty) { + throw QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName) } -descriptor match { +descriptorList.last match { Review Comment: Could you add a comment on why we are picking the last one? Will be useful for future readers as well. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging { .asInstanceOf[Descriptor] } + // TODO: Revisit to ensure that messageName is searched through all imports def buildDescriptor(descFilePath: String, messageName: String): Descriptor = { -val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc => - desc.getName == messageName || desc.getFullName == messageName +val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => { Review Comment: Style: use `find()` rather than map().filter(). (you can use `findLast()` if there is a reason to use the last match). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006201944 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { -throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); +throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => -throw new RuntimeException("Error constructing FileDescriptor", e) +throw QueryCompilationErrors.failedParsingDescriptorError(e) } } + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + */ + private def buildFileDescriptor( +fileDescriptorProto: FileDescriptorProto, +fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { +val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency => + fileDescriptorProtoMap.get(dependency) match { +case Some(dependencyProto) => + buildFileDescriptor(dependencyProto, fileDescriptorProtoMap) +case None => + throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency) + } +} +Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + */ + private def createDescriptorProtoMap( +fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = { +fileDescriptorSet.getFileList().asScala.map { descriptorProto => + descriptorProto.getName() -> descriptorProto Review Comment: That is not what I am referring to. This is not really a problem since it won't even compile. Notice two files as input for `protoc` above. May be that will since `name` will include the path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006195517 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { -throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); +throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => -throw new RuntimeException("Error constructing FileDescriptor", e) +throw QueryCompilationErrors.failedParsingDescriptorError(e) } } + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + */ + private def buildFileDescriptor( +fileDescriptorProto: FileDescriptorProto, +fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { +val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency => + fileDescriptorProtoMap.get(dependency) match { +case Some(dependencyProto) => + buildFileDescriptor(dependencyProto, fileDescriptorProtoMap) +case None => + throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency) + } +} +Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + */ + private def createDescriptorProtoMap( +fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = { +fileDescriptorSet.getFileList().asScala.map { descriptorProto => + descriptorProto.getName() -> descriptorProto Review Comment: The command is using only single proto file. Not the same as the example above, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) Review Comment: I think that is pretty restrictive. There will be cases where user wants to use the protobuf defined in imported files. E.g. they might have one proto file just to import bunch of other file that define actual protos used. What are we gaining by having restriction to allow messages only in the last listed file? In addition there could be be multiple top level files (i.e. `protoc` command is run with multiple files). I think fixing it properly would be the right thing. We can leave a TODO here, I can add that in a follow up. LMK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) Review Comment: I think that is pretty restrictive. There will be cases where user wants to use the protobuf defined in imported files. E.g. they might have one proto file jus tot import bunch of other file that define the protobufs. What are we gaining by having restriction to allow messages only in the last listed file? In addition there could be be multiple top level files (i.e. `protoc` command is run with multiple files). I think fixing it properly would be the right thing. We can leave a TODO here, I can add that in a follow up. LMK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) Review Comment: I think that is pretty restrictive. There will be cases where user wants to use the protobuf defined in imported files. E.g. they might have one proto file jus tot import bunch of other file that define the protobufs. What are we gaining by having restriction to allow messages only in the last listed file? In addition there could be be multiple top level files (i.e. `protoc` command is run with multiple files). I think fixing it properly would be the right thing. We can level a TODO here, I can add that in a follow up. LMK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006150396 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) Review Comment: There is one more related issue here: We can find messages defined in this file, but not the message defined inside the imported file. I think it is better to fix all of this properly, since you are already working on this. I.e. we should be able to use any message defined in the descriptor file. Essentially we can build `List [ FileDescriptor ]` and search for user's message in all of those. We could give preference for `last` if in this search order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006135653 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) Review Comment: Could you add comment about why `last` is used here? What happens if we use `first`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1006128712 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { -throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); +throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => -throw new RuntimeException("Error constructing FileDescriptor", e) +throw QueryCompilationErrors.failedParsingDescriptorError(e) } } + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + */ + private def buildFileDescriptor( +fileDescriptorProto: FileDescriptorProto, +fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { +val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency => + fileDescriptorProtoMap.get(dependency) match { +case Some(dependencyProto) => + buildFileDescriptor(dependencyProto, fileDescriptorProtoMap) +case None => + throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency) + } +} +Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + */ + private def createDescriptorProtoMap( +fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = { +fileDescriptorSet.getFileList().asScala.map { descriptorProto => + descriptorProto.getName() -> descriptorProto Review Comment: Btw, What happens if there are two different files with the same name? E.g. `protoc ... a/b/events.proto x/y/events.proto` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1004757693 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { -throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); +throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => -throw new RuntimeException("Error constructing FileDescriptor", e) +throw QueryCompilationErrors.failedParsingDescriptorError(e) +} + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + * @param descriptorProto + * @param descriptorProtoIndex + * @return Descriptors.FileDescriptor + */ + private def buildFileDescriptor( +fileDescriptorProto: FileDescriptorProto, +fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { +var fileDescriptorList = List[Descriptors.FileDescriptor]() +for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) { + if (!fileDescriptorProtoIndex.contains(dependencyName)) { +throw QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName) + } + val dependencyProto: FileDescriptorProto = fileDescriptorProtoIndex.get(dependencyName).get + fileDescriptorList = fileDescriptorList ++ List( +buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex)) +} +Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + * @param fileDescriptorSet Review Comment: Same for _param_ and _return_ here. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( -descriptorProto, -new Array[Descriptors.FileDescriptor](0)) + val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet) + val fileDescriptor: Descriptors.FileDescriptor = +buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex) if (fileDescriptor.getMessageTypes().isEmpty()) { -throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); +throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName()) } fileDescriptor } catch { case e: Descriptors.DescriptorValidationException => -t
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1003729392 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) +val descriptorProto: DescriptorProtos.FileDescriptorProto = Review Comment: could you some brief comments here? Is the last file the right file? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala: ## @@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging { fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => -// TODO move all the exceptions to core/src/main/resources/error/error-classes.json -throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) +throw QueryCompilationErrors.descrioptorParseError(ex) case ex: IOException => -throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + -descFilePath, - ex) +throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } -val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) +val descriptorProto: DescriptorProtos.FileDescriptorProto = + fileDescriptorSet.getFileList.asScala.last + +var fileDescriptorList = List[Descriptors.FileDescriptor]() Review Comment: Is this the import file list? What happens when the imported file imports other files? i.e. A imports B and B imports C. ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala: ## @@ -163,18 +163,22 @@ class ProtobufSerdeSuite extends SharedSparkSession { fieldMatchType: MatchType, expectedCauseMessage: String, catalystSchema: StructType = CATALYST_STRUCT): Unit = { -val e = intercept[IncompatibleSchemaException] { +val e = intercept[Exception] { serdeFactory.create(catalystSchema, protoSchema, fieldMatchType) } val expectMsg = serdeFactory match { case Deserializer => -s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}." +s"[PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR] Unable to convert" + + s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}." case Serializer => -s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}." +s"[UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE] Unable to convert SQL type" + + s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}." } assert(e.getMessage === expectMsg) -assert(e.getCause.getMessage === expectedCauseMessage) +if (e.getCause != null) { + assert(e.getCause.getMessage === expectedCauseMessage) Review Comment: What does it mean if `e.getCause` is null and `expectedCauseMessage` is not? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -62,14 +63,18 @@ object SchemaConverters { case STRING => Some(StringType) case BYTE_STRING => Some(BinaryType) case ENUM => Some(StringType) - case MESSAGE if fd.getMessageType.getName == "Duration" => + case MESSAGE +if fd.getMessageType.getName == "Duration" && +fd.getMessageType.getFields.size() == 2 && +fd.getMessageType.getFields.get(0).getName.equals("seconds") && +fd.getMessageType.getFields.get(1).getName.equals("nanos") => Some(DayTimeIntervalType.defaultConcreteType) - case MESSAGE if fd.getMessageType.getName == "Timestamp" => -Some(TimestampType) -// FIXME: Is the above accurate? Users can have protos named "Timestamp" but are not -//expected to be TimestampType in Spark. How about verifying fields? -//Same for "Duration". Only the Timestamp & Duration protos defined in -//google.p
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1003664791 ## connector/protobuf/pom.xml: ## @@ -123,6 +123,7 @@ com.google.protobuf:protoc:${protobuf.version} ${protobuf.version} + direct Review Comment: Yep. That would be nice. That will mimic normal proto files organization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1003651551 ## connector/protobuf/pom.xml: ## @@ -123,6 +123,7 @@ com.google.protobuf:protoc:${protobuf.version} ${protobuf.version} + direct Review Comment: I think it is better to have import of our own proto files. Currently we don't have any. We can split some of them. I think I left a TODO about that in my PR. Many production proto files will have imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1003630603 ## connector/protobuf/pom.xml: ## @@ -123,6 +123,7 @@ com.google.protobuf:protoc:${protobuf.version} ${protobuf.version} + direct Review Comment: That's because this removes existing 'Timestamp' message : https://github.com/apache/spark/pull/38344/files#diff-97aac63266f3c60eef9bd8dd1b76be3a5bd77fe4d17fa6fa370f5e0d9428a0a9L172 We could undo that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1003619801 ## connector/protobuf/pom.xml: ## @@ -123,6 +123,7 @@ com.google.protobuf:protoc:${protobuf.version} ${protobuf.version} + direct Review Comment: I see. What would fail if you remove this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.
rangadi commented on code in PR #38344: URL: https://github.com/apache/spark/pull/38344#discussion_r1002893438 ## connector/protobuf/pom.xml: ## @@ -123,6 +123,7 @@ com.google.protobuf:protoc:${protobuf.version} ${protobuf.version} + direct Review Comment: Could you add comment here? Is this for protos like `Timestamp' ? Looks like our code handles any Timestamp defined similar com.google.protobuf.Timestamp. Ok to not to include these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org