[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-11-01 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1010672856


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -178,46 +176,73 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
-val descriptor = 
parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-  desc.getName == messageName || desc.getFullName == messageName
-}
+val fileDescriptor = parseFileDescriptorSet(descFilePath)
+  .find(!_.getMessageTypes.asScala.find(desc =>

Review Comment:
   Oh, we need to find() invocations and check is made twice. How about this? 
[Optional to do, only a suggestion]
   
// Find the first message descriptor that matches the name.
val descriptorOpt =  parseFileDescriptorSet(descFilePath)
.flatMap { fileDesc => 
   fileDesc.getMessageTypes.asScala.find { desc => 
 desc.getName == messageName || desc.getFullName == 
messageName
   }
}.headOption
   
 descriptoOpt match {
case Some(d) => d
case None => throw 
QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
} 
 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-31 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006324273


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging {
   .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports

Review Comment:
   What is missing? Looks fairly complete to me.
   Better to state the problem here.



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging {
   .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
-val descriptor = 
parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-  desc.getName == messageName || desc.getFullName == messageName
+val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor 
=> {
+  fileDescriptor.getMessageTypes.asScala.find { desc =>
+desc.getName == messageName || desc.getFullName == messageName
+  }
+}).filter(f => !f.isEmpty)
+
+if (descriptorList.isEmpty) {
+  throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName)
 }
 
-descriptor match {
+descriptorList.last match {
   case Some(d) => d
   case None =>
-throw new RuntimeException(s"Unable to locate Message '$messageName' 
in Descriptor")
+throw 
QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
 }
   }
 
-  private def parseFileDescriptor(descFilePath: String): 
Descriptors.FileDescriptor = {
+  private def parseFileDescriptor(descFilePath: String): 
List[Descriptors.FileDescriptor] = {

Review Comment:
   Rename to `parseFileDescriptorSet` (otherwise it sounds like it is parsing 
just one file descriptor). 



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging {
   .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
-val descriptor = 
parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-  desc.getName == messageName || desc.getFullName == messageName
+val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor 
=> {
+  fileDescriptor.getMessageTypes.asScala.find { desc =>
+desc.getName == messageName || desc.getFullName == messageName
+  }
+}).filter(f => !f.isEmpty)
+
+if (descriptorList.isEmpty) {
+  throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName)
 }
 
-descriptor match {
+descriptorList.last match {

Review Comment:
   Could you add a comment on why we are picking the last one? Will be useful 
for future readers as well.



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging {
   .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
-val descriptor = 
parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-  desc.getName == messageName || desc.getFullName == messageName
+val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor 
=> {

Review Comment:
   Style: use `find()` rather than map().filter(). 
   
   (you can use `findLast()` if there is a reason to use the last match). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006201944


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
   if (fileDescriptor.getMessageTypes().isEmpty()) {
-throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
   }
   fileDescriptor
 } catch {
   case e: Descriptors.DescriptorValidationException =>
-throw new RuntimeException("Error constructing FileDescriptor", e)
+throw QueryCompilationErrors.failedParsingDescriptorError(e)
 }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+fileDescriptorProto: FileDescriptorProto,
+fileDescriptorProtoMap: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+val fileDescriptorList = 
fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+  fileDescriptorProtoMap.get(dependency) match {
+case Some(dependencyProto) =>
+  buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+case None =>
+  throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+  }
+}
+Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   */
+  private def createDescriptorProtoMap(
+fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+  descriptorProto.getName() -> descriptorProto

Review Comment:
   That is not what I am referring to. This is not really a problem since it 
won't even compile. 
   
   Notice two files as input for `protoc` above. May be that will since `name` 
will include the path.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006195517


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
   if (fileDescriptor.getMessageTypes().isEmpty()) {
-throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
   }
   fileDescriptor
 } catch {
   case e: Descriptors.DescriptorValidationException =>
-throw new RuntimeException("Error constructing FileDescriptor", e)
+throw QueryCompilationErrors.failedParsingDescriptorError(e)
 }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+fileDescriptorProto: FileDescriptorProto,
+fileDescriptorProtoMap: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+val fileDescriptorList = 
fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+  fileDescriptorProtoMap.get(dependency) match {
+case Some(dependencyProto) =>
+  buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+case None =>
+  throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+  }
+}
+Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   */
+  private def createDescriptorProtoMap(
+fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+  descriptorProto.getName() -> descriptorProto

Review Comment:
   The command is using only single proto file. Not the same as the example 
above, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)

Review Comment:
   I think that is pretty restrictive. There will be cases where user wants to 
use the protobuf defined in imported files. E.g. they might have one proto file 
just to import bunch of other file that define actual protos used.
   
   What are we gaining by having restriction to allow messages only in the last 
listed file? In addition there could be be multiple top level files (i.e. 
`protoc` command is run with multiple files).
   
   I think fixing it properly would be the right thing. We can leave a TODO 
here, I can add that in a follow up. LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)

Review Comment:
   I think that is pretty restrictive. There will be cases where user wants to 
use the protobuf defined in imported files. E.g. they might have one proto file 
jus tot import bunch of other file that define the protobufs. 
   
   What are we gaining by having restriction to allow messages only in the last 
listed file? In addition there could be be multiple top level files (i.e. 
`protoc` command is run with multiple files).
   
   I think fixing it properly would be the right thing. We can leave a TODO 
here, I can add that in a follow up. LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)

Review Comment:
   I think that is pretty restrictive. There will be cases where user wants to 
use the protobuf defined in imported files. E.g. they might have one proto file 
jus tot import bunch of other file that define the protobufs. 
   
   What are we gaining by having restriction to allow messages only in the last 
listed file? In addition there could be be multiple top level files (i.e. 
`protoc` command is run with multiple files).
   
   I think fixing it properly would be the right thing. We can level a TODO 
here, I can add that in a follow up. LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006150396


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)

Review Comment:
   There is one more related issue here: We can find messages defined in this 
file, but not the message defined inside the imported file. 
   
   I think it is better to fix all of this properly, since you are already 
working on this. 
   I.e. we should be able to use any message defined in the descriptor file. 
   Essentially we can build `List [ FileDescriptor ]` and search for user's 
message in all of those. We could give preference for `last` if in this search 
order.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006135653


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)

Review Comment:
   Could you add comment about why `last` is used here? What happens if we use 
`first`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-26 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006128712


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
   if (fileDescriptor.getMessageTypes().isEmpty()) {
-throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
   }
   fileDescriptor
 } catch {
   case e: Descriptors.DescriptorValidationException =>
-throw new RuntimeException("Error constructing FileDescriptor", e)
+throw QueryCompilationErrors.failedParsingDescriptorError(e)
 }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+fileDescriptorProto: FileDescriptorProto,
+fileDescriptorProtoMap: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+val fileDescriptorList = 
fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+  fileDescriptorProtoMap.get(dependency) match {
+case Some(dependencyProto) =>
+  buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+case None =>
+  throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+  }
+}
+Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   */
+  private def createDescriptorProtoMap(
+fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+  descriptorProto.getName() -> descriptorProto

Review Comment:
   Btw, What happens if there are two different files with the same name? E.g. 
   `protoc ... a/b/events.proto x/y/events.proto`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-25 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1004757693


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
   if (fileDescriptor.getMessageTypes().isEmpty()) {
-throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
   }
   fileDescriptor
 } catch {
   case e: Descriptors.DescriptorValidationException =>
-throw new RuntimeException("Error constructing FileDescriptor", e)
+throw QueryCompilationErrors.failedParsingDescriptorError(e)
+}
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+fileDescriptorProto: FileDescriptorProto,
+fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): 
Descriptors.FileDescriptor = {
+var fileDescriptorList = List[Descriptors.FileDescriptor]()
+for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) {
+  if (!fileDescriptorProtoIndex.contains(dependencyName)) {
+throw 
QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName)
+  }
+  val dependencyProto: FileDescriptorProto = 
fileDescriptorProtoIndex.get(dependencyName).get
+  fileDescriptorList = fileDescriptorList ++ List(
+buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex))
+}
+Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, 
fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors 
to protos.
+   * @param fileDescriptorSet

Review Comment:
   Same for _param_ and _return_ here.



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
-
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
 try {
-  val fileDescriptor: Descriptors.FileDescriptor = 
Descriptors.FileDescriptor.buildFrom(
-descriptorProto,
-new Array[Descriptors.FileDescriptor](0))
+  val fileDescriptorProtoIndex = 
createDescriptorProtoIndex(fileDescriptorSet)
+  val fileDescriptor: Descriptors.FileDescriptor =
+buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, 
fileDescriptorProtoIndex)
   if (fileDescriptor.getMessageTypes().isEmpty()) {
-throw new RuntimeException("No MessageTypes returned, " + 
fileDescriptor.getName());
+throw 
QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
   }
   fileDescriptor
 } catch {
   case e: Descriptors.DescriptorValidationException =>
-t

[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-24 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003729392


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
 
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
+val descriptorProto: DescriptorProtos.FileDescriptorProto =

Review Comment:
   could you some brief comments here?
   Is the last file the right file? 



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##
@@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging {
   fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
 } catch {
   case ex: InvalidProtocolBufferException =>
-// TODO move all the exceptions to 
core/src/main/resources/error/error-classes.json
-throw new RuntimeException("Error parsing descriptor byte[] into 
Descriptor object", ex)
+throw QueryCompilationErrors.descrioptorParseError(ex)
   case ex: IOException =>
-throw new RuntimeException(
-  "Error reading Protobuf descriptor file at path: " +
-descFilePath,
-  ex)
+throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
 }
 
-val descriptorProto: DescriptorProtos.FileDescriptorProto = 
fileDescriptorSet.getFile(0)
+val descriptorProto: DescriptorProtos.FileDescriptorProto =
+  fileDescriptorSet.getFileList.asScala.last
+
+var fileDescriptorList = List[Descriptors.FileDescriptor]()

Review Comment:
   Is this the import file list? What happens when the imported file imports 
other files? i.e. A imports B and B imports C. 



##
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala:
##
@@ -163,18 +163,22 @@ class ProtobufSerdeSuite extends SharedSparkSession {
   fieldMatchType: MatchType,
   expectedCauseMessage: String,
   catalystSchema: StructType = CATALYST_STRUCT): Unit = {
-val e = intercept[IncompatibleSchemaException] {
+val e = intercept[Exception] {
   serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
 }
 val expectMsg = serdeFactory match {
   case Deserializer =>
-s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type 
${catalystSchema.sql}."
+s"[PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR] Unable to convert" +
+  s" ${protoSchema.getName} of Protobuf to SQL type 
${toSQLType(catalystSchema)}."
   case Serializer =>
-s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type 
${protoSchema.getName}."
+s"[UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE] Unable to convert SQL type" +
+  s" ${toSQLType(catalystSchema)} to Protobuf type 
${protoSchema.getName}."
 }
 
 assert(e.getMessage === expectMsg)
-assert(e.getCause.getMessage === expectedCauseMessage)
+if (e.getCause != null) {
+  assert(e.getCause.getMessage === expectedCauseMessage)

Review Comment:
   What does it mean if `e.getCause` is null and `expectedCauseMessage` is not?



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##
@@ -62,14 +63,18 @@ object SchemaConverters {
   case STRING => Some(StringType)
   case BYTE_STRING => Some(BinaryType)
   case ENUM => Some(StringType)
-  case MESSAGE if fd.getMessageType.getName == "Duration" =>
+  case MESSAGE
+if fd.getMessageType.getName == "Duration" &&
+fd.getMessageType.getFields.size() == 2 &&
+fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+fd.getMessageType.getFields.get(1).getName.equals("nanos") =>
 Some(DayTimeIntervalType.defaultConcreteType)
-  case MESSAGE if fd.getMessageType.getName == "Timestamp" =>
-Some(TimestampType)
-// FIXME: Is the above accurate? Users can have protos named 
"Timestamp" but are not
-//expected to be TimestampType in Spark. How about verifying 
fields?
-//Same for "Duration". Only the Timestamp & Duration protos 
defined in
-//google.p

[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-24 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003664791


##
connector/protobuf/pom.xml:
##
@@ -123,6 +123,7 @@
 
   
com.google.protobuf:protoc:${protobuf.version}
   ${protobuf.version}
+  direct

Review Comment:
   Yep. That would be nice. That will mimic normal proto files organization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-24 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003651551


##
connector/protobuf/pom.xml:
##
@@ -123,6 +123,7 @@
 
   
com.google.protobuf:protoc:${protobuf.version}
   ${protobuf.version}
+  direct

Review Comment:
   I think it is better to have import of our own proto files. Currently we 
don't have any. We can split some of them. I think I left a TODO about that in 
my PR. Many production proto files will have imports. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-24 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003630603


##
connector/protobuf/pom.xml:
##
@@ -123,6 +123,7 @@
 
   
com.google.protobuf:protoc:${protobuf.version}
   ${protobuf.version}
+  direct

Review Comment:
   That's because this removes existing 'Timestamp' message : 
https://github.com/apache/spark/pull/38344/files#diff-97aac63266f3c60eef9bd8dd1b76be3a5bd77fe4d17fa6fa370f5e0d9428a0a9L172
   
   We could undo that change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-24 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003619801


##
connector/protobuf/pom.xml:
##
@@ -123,6 +123,7 @@
 
   
com.google.protobuf:protoc:${protobuf.version}
   ${protobuf.version}
+  direct

Review Comment:
   I see. What would fail if you remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

2022-10-23 Thread GitBox


rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1002893438


##
connector/protobuf/pom.xml:
##
@@ -123,6 +123,7 @@
 
   
com.google.protobuf:protoc:${protobuf.version}
   ${protobuf.version}
+  direct

Review Comment:
   Could you add comment here?
   Is this for protos like `Timestamp' ? Looks like our code handles any 
Timestamp defined similar com.google.protobuf.Timestamp. Ok to not to include 
these. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org