rangadi commented on code in PR #41192:
URL: https://github.com/apache/spark/pull/41192#discussion_r1204818591


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1452,29 +1452,29 @@ class SparkConnectPlanner(val session: SparkSession) {
     def extractArgsOfProtobufFunction(
         functionName: String,
         argumentsCount: Int,
-        children: Seq[Expression]): (String, Option[String], Map[String, 
String]) = {
+        children: Seq[Expression]): (String, Option[Array[Byte]], Map[String, 
String]) = {
       val messageClassName = children(1) match {
         case Literal(s, StringType) if s != null => s.toString
         case other =>
           throw InvalidPlanInput(
             s"MessageClassName in $functionName should be a literal string, 
but got $other")
       }
-      val (descFilePathOpt, options) = if (argumentsCount == 2) {
+      val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
         (None, Map.empty[String, String])
       } else if (argumentsCount == 3) {
         children(2) match {
-          case Literal(s, StringType) if s != null =>
-            (Some(s.toString), Map.empty[String, String])
+          case Literal(b, BinaryType) if b != null =>
+            (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
           case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>

Review Comment:
   Good catch, updated the exception description.



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
   //  1. cd connector/connect/common/src/main/protobuf/spark/connect
   //  2. protoc --include_imports 
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc 
common.proto
   // scalastyle:on line.size.limit
-  private val testDescFilePath: String = java.nio.file.Paths
-    .get("../", "common", "src", "test", "resources", "protobuf-tests")
-    .resolve("common.desc")
-    .toString
+  private val testDescFilePath: String = 
s"${IntegrationTestUtils.sparkHome}/connector/" +

Review Comment:
   I have plans to do that for `connect/protobuf` and will that here as well. I 
had run into some issues with SBT when I first tried to do that. I need to dust 
off of the old branch where I did this. 



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala:
##########
@@ -195,7 +195,9 @@ class ProtoToParsedPlanTestSuite
   }
 
   private def removeMemoryAddress(expr: String): String = {
-    expr.replaceAll("@[0-9a-f]+,", ",")
+    expr
+      .replaceAll("@[0-9a-f]+,", ",")
+      .replaceAll("@[0-9a-f]+\\)", ")")

Review Comment:
   This method helps unit tests that compare plan in golden files by removing 
memory references (since the change each time). It was removing only those 
followed by ',' ('@[ref],'), I updated to replace those followed by ')' as 
well. This PR replaced 'string' with a byte-buffer and the golden file also 
changed to and included byte-array references. 
   
   See `from_protobuf_messageClassName_descFilePath_options.explain` file here: 
   
https://github.com/apache/spark/pull/41192/files#diff-94822b18acb053bbfba8c2dae0c1dd1e806001ca996585b775719212572d4762R1



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
   //  1. cd connector/connect/common/src/main/protobuf/spark/connect
   //  2. protoc --include_imports 
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc 
common.proto
   // scalastyle:on line.size.limit
-  private val testDescFilePath: String = java.nio.file.Paths
-    .get("../", "common", "src", "test", "resources", "protobuf-tests")
-    .resolve("common.desc")
-    .toString
+  private val testDescFilePath: String = 
s"${IntegrationTestUtils.sparkHome}/connector/" +
+    "connect/common/src/test/resources/protobuf-tests/common.desc"
 
   test("from_protobuf messageClassName") {

Review Comment:
   I was testing with maven. It worked ok. 
   Btw, do you know how we can get assembly jar from `connect/protobuf` into 
`assembly/target/scala-2.12/`? Avro jar is included there?
   I had to take out my ClientE2E test because of this missing jar. That could 
likely help with 
[SPARK-43646](https://issues.apache.org/jira/browse/SPARK-43646). 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -286,9 +289,9 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   /** Builds [[TypeRegistry]] with all the messages found in the descriptor 
file. */
-  private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry 
= {
+  private[protobuf] def buildTypeRegistry(descriptorBytes: Array[Byte]): 
TypeRegistry = {

Review Comment:
   Yep, fixed.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1452,29 +1452,29 @@ class SparkConnectPlanner(val session: SparkSession) {
     def extractArgsOfProtobufFunction(
         functionName: String,
         argumentsCount: Int,
-        children: Seq[Expression]): (String, Option[String], Map[String, 
String]) = {
+        children: Seq[Expression]): (String, Option[Array[Byte]], Map[String, 
String]) = {
       val messageClassName = children(1) match {
         case Literal(s, StringType) if s != null => s.toString
         case other =>
           throw InvalidPlanInput(
             s"MessageClassName in $functionName should be a literal string, 
but got $other")
       }
-      val (descFilePathOpt, options) = if (argumentsCount == 2) {
+      val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
         (None, Map.empty[String, String])
       } else if (argumentsCount == 3) {
         children(2) match {
-          case Literal(s, StringType) if s != null =>
-            (Some(s.toString), Map.empty[String, String])
+          case Literal(b, BinaryType) if b != null =>
+            (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
           case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
             (None, ExprUtils.convertToMapData(CreateMap(arguments)))
           case other =>
             throw InvalidPlanInput(
               s"The valid type for the 3rd arg in $functionName is string or 
map, but got $other")

Review Comment:
   Done.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -199,4 +301,16 @@ object functions {
       options: java.util.Map[String, String]): Column = {
     fnWithOptions("to_protobuf", options.asScala.iterator, data, 
lit(messageClassName))
   }
+
+  private def emptyOptions: java.util.Map[String, String] = 
Collections.emptyMap[String, String]()
+
+  private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+    // This method is copied from 
org.apache.spark.sql.protobuf.util.ProtobufUtils

Review Comment:
   I meant the `readDescriptorFileContent()` method itself. I moved the comment 
above the method signature to avoid the confusion.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -223,28 +226,28 @@ private[sql] object ProtobufUtils extends Logging {
     }
   }
 
-  private def parseFileDescriptorSet(descFilePath: String): 
List[Descriptors.FileDescriptor] = {
-    var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
+  def readDescriptorFileContent(filePath: String): Array[Byte] = {

Review Comment:
   This function is specific to Protobuf because of the exception it throws. I 
copied it to Protobuf in connect. It is a small method. We can refactor later 
when common-utils are shared. It is ok for now I think. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -199,4 +301,16 @@ object functions {
       options: java.util.Map[String, String]): Column = {
     fnWithOptions("to_protobuf", options.asScala.iterator, data, 
lit(messageClassName))
   }
+
+  private def emptyOptions: java.util.Map[String, String] = 
Collections.emptyMap[String, String]()
+
+  private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+    // This method is copied from 
org.apache.spark.sql.protobuf.util.ProtobufUtils
+    try {
+      FileUtils.readFileToByteArray(new File(filePath))
+    } catch {
+      case NonFatal(ex) =>
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, 
ex)

Review Comment:
   Do you mean we should throw different error than 
'cannotFindDescriptorFileError'? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to