LuciferYang commented on code in PR #41192:
URL: https://github.com/apache/spark/pull/41192#discussion_r1199973589


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1452,29 +1452,29 @@ class SparkConnectPlanner(val session: SparkSession) {
     def extractArgsOfProtobufFunction(
         functionName: String,
         argumentsCount: Int,
-        children: Seq[Expression]): (String, Option[String], Map[String, 
String]) = {
+        children: Seq[Expression]): (String, Option[Array[Byte]], Map[String, 
String]) = {
       val messageClassName = children(1) match {
         case Literal(s, StringType) if s != null => s.toString
         case other =>
           throw InvalidPlanInput(
             s"MessageClassName in $functionName should be a literal string, 
but got $other")
       }
-      val (descFilePathOpt, options) = if (argumentsCount == 2) {
+      val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
         (None, Map.empty[String, String])
       } else if (argumentsCount == 3) {
         children(2) match {
-          case Literal(s, StringType) if s != null =>
-            (Some(s.toString), Map.empty[String, String])
+          case Literal(b, BinaryType) if b != null =>
+            (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
           case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>

Review Comment:
   I  think we should refactor the error message due to the change from 
`descFilePath` to `binaryFileDescSetOpt` and the change in parameter type
   
   



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala:
##########
@@ -195,7 +195,9 @@ class ProtoToParsedPlanTestSuite
   }
 
   private def removeMemoryAddress(expr: String): String = {
-    expr.replaceAll("@[0-9a-f]+,", ",")
+    expr
+      .replaceAll("@[0-9a-f]+,", ",")
+      .replaceAll("@[0-9a-f]+\\)", ")")

Review Comment:
   What is the reason for this change?
   
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -286,9 +289,9 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   /** Builds [[TypeRegistry]] with all the messages found in the descriptor 
file. */
-  private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry 
= {
+  private[protobuf] def buildTypeRegistry(descriptorBytes: Array[Byte]): 
TypeRegistry = {

Review Comment:
   Should the comment for this function also be corrected?
   
   



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1452,29 +1452,29 @@ class SparkConnectPlanner(val session: SparkSession) {
     def extractArgsOfProtobufFunction(
         functionName: String,
         argumentsCount: Int,
-        children: Seq[Expression]): (String, Option[String], Map[String, 
String]) = {
+        children: Seq[Expression]): (String, Option[Array[Byte]], Map[String, 
String]) = {
       val messageClassName = children(1) match {
         case Literal(s, StringType) if s != null => s.toString
         case other =>
           throw InvalidPlanInput(
             s"MessageClassName in $functionName should be a literal string, 
but got $other")
       }
-      val (descFilePathOpt, options) = if (argumentsCount == 2) {
+      val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
         (None, Map.empty[String, String])
       } else if (argumentsCount == 3) {
         children(2) match {
-          case Literal(s, StringType) if s != null =>
-            (Some(s.toString), Map.empty[String, String])
+          case Literal(b, BinaryType) if b != null =>
+            (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
           case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
             (None, ExprUtils.convertToMapData(CreateMap(arguments)))
           case other =>
             throw InvalidPlanInput(
               s"The valid type for the 3rd arg in $functionName is string or 
map, but got $other")

Review Comment:
   ditto



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -145,9 +147,10 @@ private[sql] object ProtobufUtils extends Logging {
    *  the `messageName` is treated as Java class name.
    * @return
    */
-  def buildDescriptor(messageName: String, descFilePathOpt: Option[String]): 
Descriptor = {
-    descFilePathOpt match {
-      case Some(filePath) => buildDescriptor(descFilePath = filePath, 
messageName)
+  def buildDescriptor(messageName: String, binaryFileDescriptorSet: 
Option[Array[Byte]])

Review Comment:
   Please fix the scaladoc of this function due to parameter changes
   
   



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
   //  1. cd connector/connect/common/src/main/protobuf/spark/connect
   //  2. protoc --include_imports 
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc 
common.proto
   // scalastyle:on line.size.limit
-  private val testDescFilePath: String = java.nio.file.Paths
-    .get("../", "common", "src", "test", "resources", "protobuf-tests")
-    .resolve("common.desc")
-    .toString
+  private val testDescFilePath: String = 
s"${IntegrationTestUtils.sparkHome}/connector/" +

Review Comment:
   Maybe not in this one, but can we generate a `.desc` file to use and  remove 
dependencies on external file `common.desc`?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala:
##########
@@ -26,14 +26,17 @@ import org.apache.spark.sql.types.{BinaryType, DataType}
 private[sql] case class CatalystDataToProtobuf(
     child: Expression,
     messageName: String,
-    descFilePath: Option[String] = None,
+    binaryFileDescriptorSet: Option[Array[Byte]] = None,
     options: Map[String, String] = Map.empty)
     extends UnaryExpression {
 
+  // TODO(SPARK-43578): binaryFileDescriptorSet could be very large in some 
cases. It is better

Review Comment:
   Will `connect` also be changed like this TODO?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to