rangadi commented on code in PR #41192:
URL: https://github.com/apache/spark/pull/41192#discussion_r1205797845


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala:
##########
@@ -26,14 +26,17 @@ import org.apache.spark.sql.types.{BinaryType, DataType}
 private[sql] case class CatalystDataToProtobuf(
     child: Expression,
     messageName: String,
-    descFilePath: Option[String] = None,
+    binaryFileDescriptorSet: Option[Array[Byte]] = None,
     options: Map[String, String] = Map.empty)
     extends UnaryExpression {
 
+  // TODO(SPARK-43578): binaryFileDescriptorSet could be very large in some 
cases. It is better

Review Comment:
   May be not. It doesn't save much, if any at all (we need to transfer the 
buffer to the server). 
   We can treat those two separately even if we want handle on connect side. 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
   //  1. cd connector/connect/common/src/main/protobuf/spark/connect
   //  2. protoc --include_imports 
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc 
common.proto
   // scalastyle:on line.size.limit
-  private val testDescFilePath: String = java.nio.file.Paths
-    .get("../", "common", "src", "test", "resources", "protobuf-tests")
-    .resolve("common.desc")
-    .toString
+  private val testDescFilePath: String = 
s"${IntegrationTestUtils.sparkHome}/connector/" +
+    "connect/common/src/test/resources/protobuf-tests/common.desc"
 
   test("from_protobuf messageClassName") {

Review Comment:
   We can update assembly dependency, right (in a separate PR)? Protobuf 
package is needed for other Spark applications too, not just client. I will 
take a look at specific test failure after this. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -199,4 +301,16 @@ object functions {
       options: java.util.Map[String, String]): Column = {
     fnWithOptions("to_protobuf", options.asScala.iterator, data, 
lit(messageClassName))
   }
+
+  private def emptyOptions: java.util.Map[String, String] = 
Collections.emptyMap[String, String]()
+
+  private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+    // This method is copied from 
org.apache.spark.sql.protobuf.util.ProtobufUtils
+    try {
+      FileUtils.readFileToByteArray(new File(filePath))
+    } catch {
+      case NonFatal(ex) =>
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, 
ex)

Review Comment:
   This was how it was before this PR. I separated FileNotFound from other 
errors, PTAL.
   Note that we need to create a new structured error for each time if we want 
to be very specific. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -145,9 +147,10 @@ private[sql] object ProtobufUtils extends Logging {
    *  the `messageName` is treated as Java class name.
    * @return
    */
-  def buildDescriptor(messageName: String, descFilePathOpt: Option[String]): 
Descriptor = {
-    descFilePathOpt match {
-      case Some(filePath) => buildDescriptor(descFilePath = filePath, 
messageName)
+  def buildDescriptor(messageName: String, binaryFileDescriptorSet: 
Option[Array[Byte]])

Review Comment:
   Updated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to