rangadi commented on code in PR #41192:
URL: https://github.com/apache/spark/pull/41192#discussion_r1205797845
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala:
##########
@@ -26,14 +26,17 @@ import org.apache.spark.sql.types.{BinaryType, DataType}
private[sql] case class CatalystDataToProtobuf(
child: Expression,
messageName: String,
- descFilePath: Option[String] = None,
+ binaryFileDescriptorSet: Option[Array[Byte]] = None,
options: Map[String, String] = Map.empty)
extends UnaryExpression {
+ // TODO(SPARK-43578): binaryFileDescriptorSet could be very large in some
cases. It is better
Review Comment:
May be not. It doesn't save much, if any at all (we need to transfer the
buffer to the server).
We can treat those two separately even if we want handle on connect side.
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
// 1. cd connector/connect/common/src/main/protobuf/spark/connect
// 2. protoc --include_imports
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc
common.proto
// scalastyle:on line.size.limit
- private val testDescFilePath: String = java.nio.file.Paths
- .get("../", "common", "src", "test", "resources", "protobuf-tests")
- .resolve("common.desc")
- .toString
+ private val testDescFilePath: String =
s"${IntegrationTestUtils.sparkHome}/connector/" +
+ "connect/common/src/test/resources/protobuf-tests/common.desc"
test("from_protobuf messageClassName") {
Review Comment:
We can update assembly dependency, right (in a separate PR)? Protobuf
package is needed for other Spark applications too, not just client. I will
take a look at specific test failure after this.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -199,4 +301,16 @@ object functions {
options: java.util.Map[String, String]): Column = {
fnWithOptions("to_protobuf", options.asScala.iterator, data,
lit(messageClassName))
}
+
+ private def emptyOptions: java.util.Map[String, String] =
Collections.emptyMap[String, String]()
+
+ private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+ // This method is copied from
org.apache.spark.sql.protobuf.util.ProtobufUtils
+ try {
+ FileUtils.readFileToByteArray(new File(filePath))
+ } catch {
+ case NonFatal(ex) =>
+ throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath,
ex)
Review Comment:
This was how it was before this PR. I separated FileNotFound from other
errors, PTAL.
Note that we need to create a new structured error for each time if we want
to be very specific.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -145,9 +147,10 @@ private[sql] object ProtobufUtils extends Logging {
* the `messageName` is treated as Java class name.
* @return
*/
- def buildDescriptor(messageName: String, descFilePathOpt: Option[String]):
Descriptor = {
- descFilePathOpt match {
- case Some(filePath) => buildDescriptor(descFilePath = filePath,
messageName)
+ def buildDescriptor(messageName: String, binaryFileDescriptorSet:
Option[Array[Byte]])
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]