rangadi commented on code in PR #41192:
URL: https://github.com/apache/spark/pull/41192#discussion_r1204818591
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1452,29 +1452,29 @@ class SparkConnectPlanner(val session: SparkSession) {
def extractArgsOfProtobufFunction(
functionName: String,
argumentsCount: Int,
- children: Seq[Expression]): (String, Option[String], Map[String,
String]) = {
+ children: Seq[Expression]): (String, Option[Array[Byte]], Map[String,
String]) = {
val messageClassName = children(1) match {
case Literal(s, StringType) if s != null => s.toString
case other =>
throw InvalidPlanInput(
s"MessageClassName in $functionName should be a literal string,
but got $other")
}
- val (descFilePathOpt, options) = if (argumentsCount == 2) {
+ val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
(None, Map.empty[String, String])
} else if (argumentsCount == 3) {
children(2) match {
- case Literal(s, StringType) if s != null =>
- (Some(s.toString), Map.empty[String, String])
+ case Literal(b, BinaryType) if b != null =>
+ (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
Review Comment:
Good catch, updated the exception description.
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
// 1. cd connector/connect/common/src/main/protobuf/spark/connect
// 2. protoc --include_imports
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc
common.proto
// scalastyle:on line.size.limit
- private val testDescFilePath: String = java.nio.file.Paths
- .get("../", "common", "src", "test", "resources", "protobuf-tests")
- .resolve("common.desc")
- .toString
+ private val testDescFilePath: String =
s"${IntegrationTestUtils.sparkHome}/connector/" +
Review Comment:
I have plans to do that for `connect/protobuf` and will that here as well. I
had run into some issues with SBT when I first tried to do that. I need to dust
off of the old branch where I did this.
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala:
##########
@@ -195,7 +195,9 @@ class ProtoToParsedPlanTestSuite
}
private def removeMemoryAddress(expr: String): String = {
- expr.replaceAll("@[0-9a-f]+,", ",")
+ expr
+ .replaceAll("@[0-9a-f]+,", ",")
+ .replaceAll("@[0-9a-f]+\\)", ")")
Review Comment:
This method helps unit tests that compare plan in golden files by removing
memory references (since the change each time). It was removing only those
followed by ',' ('@[ref],'), I updated to replace those followed by ')' as
well. This PR replaced 'string' with a byte-buffer and the golden file also
changed to and included byte-array references.
See `from_protobuf_messageClassName_descFilePath_options.explain` file here:
https://github.com/apache/spark/pull/41192/files#diff-94822b18acb053bbfba8c2dae0c1dd1e806001ca996585b775719212572d4762R1
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##########
@@ -2251,10 +2252,8 @@ class PlanGenerationTestSuite
// 1. cd connector/connect/common/src/main/protobuf/spark/connect
// 2. protoc --include_imports
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc
common.proto
// scalastyle:on line.size.limit
- private val testDescFilePath: String = java.nio.file.Paths
- .get("../", "common", "src", "test", "resources", "protobuf-tests")
- .resolve("common.desc")
- .toString
+ private val testDescFilePath: String =
s"${IntegrationTestUtils.sparkHome}/connector/" +
+ "connect/common/src/test/resources/protobuf-tests/common.desc"
test("from_protobuf messageClassName") {
Review Comment:
I was testing with maven. It worked ok.
Btw, do you know how we can get assembly jar from `connect/protobuf` into
`assembly/target/scala-2.12/`? Avro jar is included there?
I had to take out my ClientE2E test because of this missing jar. That could
likely help with
[SPARK-43646](https://issues.apache.org/jira/browse/SPARK-43646).
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -286,9 +289,9 @@ private[sql] object ProtobufUtils extends Logging {
}
/** Builds [[TypeRegistry]] with all the messages found in the descriptor
file. */
- private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry
= {
+ private[protobuf] def buildTypeRegistry(descriptorBytes: Array[Byte]):
TypeRegistry = {
Review Comment:
Yep, fixed.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1452,29 +1452,29 @@ class SparkConnectPlanner(val session: SparkSession) {
def extractArgsOfProtobufFunction(
functionName: String,
argumentsCount: Int,
- children: Seq[Expression]): (String, Option[String], Map[String,
String]) = {
+ children: Seq[Expression]): (String, Option[Array[Byte]], Map[String,
String]) = {
val messageClassName = children(1) match {
case Literal(s, StringType) if s != null => s.toString
case other =>
throw InvalidPlanInput(
s"MessageClassName in $functionName should be a literal string,
but got $other")
}
- val (descFilePathOpt, options) = if (argumentsCount == 2) {
+ val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
(None, Map.empty[String, String])
} else if (argumentsCount == 3) {
children(2) match {
- case Literal(s, StringType) if s != null =>
- (Some(s.toString), Map.empty[String, String])
+ case Literal(b, BinaryType) if b != null =>
+ (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
(None, ExprUtils.convertToMapData(CreateMap(arguments)))
case other =>
throw InvalidPlanInput(
s"The valid type for the 3rd arg in $functionName is string or
map, but got $other")
Review Comment:
Done.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -199,4 +301,16 @@ object functions {
options: java.util.Map[String, String]): Column = {
fnWithOptions("to_protobuf", options.asScala.iterator, data,
lit(messageClassName))
}
+
+ private def emptyOptions: java.util.Map[String, String] =
Collections.emptyMap[String, String]()
+
+ private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+ // This method is copied from
org.apache.spark.sql.protobuf.util.ProtobufUtils
Review Comment:
I meant the `readDescriptorFileContent()` method itself. I moved the comment
above the method signature to avoid the confusion.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -223,28 +226,28 @@ private[sql] object ProtobufUtils extends Logging {
}
}
- private def parseFileDescriptorSet(descFilePath: String):
List[Descriptors.FileDescriptor] = {
- var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
+ def readDescriptorFileContent(filePath: String): Array[Byte] = {
Review Comment:
This function is specific to Protobuf because of the exception it throws. I
copied it to Protobuf in connect. It is a small method. We can refactor later
when common-utils are shared. It is ok for now I think.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala:
##########
@@ -199,4 +301,16 @@ object functions {
options: java.util.Map[String, String]): Column = {
fnWithOptions("to_protobuf", options.asScala.iterator, data,
lit(messageClassName))
}
+
+ private def emptyOptions: java.util.Map[String, String] =
Collections.emptyMap[String, String]()
+
+ private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+ // This method is copied from
org.apache.spark.sql.protobuf.util.ProtobufUtils
+ try {
+ FileUtils.readFileToByteArray(new File(filePath))
+ } catch {
+ case NonFatal(ex) =>
+ throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath,
ex)
Review Comment:
Do you mean we should throw different error than
'cannotFindDescriptorFileError'?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]