rangadi commented on code in PR #38286:
URL: https://github.com/apache/spark/pull/38286#discussion_r997323389
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -108,18 +112,16 @@ private[protobuf] case class ProtobufDataToCatalyst(
val binary = input.asInstanceOf[Array[Byte]]
try {
result = DynamicMessage.parseFrom(messageDescriptor, binary)
- val unknownFields = result.getUnknownFields
- if (!unknownFields.asMap().isEmpty) {
- unknownFields.asMap().keySet().asScala.map { number =>
- {
- if (fieldsNumbers.contains(number)) {
- return handleException(
- new Throwable(s"Type mismatch encountered for field:" +
- s" ${messageDescriptor.getFields.get(number)}"))
- }
- }
- }
+
+
result.getUnknownFields.asMap().keySet().asScala.find(fieldsNumbers.contains(_))
match {
Review Comment:
no functional change. Simplifies the code and adds a comment about the error
check.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -132,23 +133,63 @@ private[sql] object ProtobufUtils extends Logging {
}
}
- def buildDescriptor(descFilePath: String, messageName: String): Descriptor =
{
- val fileDescriptor: Descriptors.FileDescriptor =
parseFileDescriptor(descFilePath)
- var result: Descriptors.Descriptor = null;
+ /**
+ * Builds Protobuf message descriptor either from the Java class or from
serialized descriptor
+ * read from the file.
+ * @param messageName
+ * Protobuf message name or Java class name.
+ * @param descFilePathOpt
+ * When the file name set, the descriptor and it's dependencies are read
from the file. Other
+ * the `messageName` is treated as Java class name.
+ * @return
+ */
+ def buildDescriptor(messageName: String, descFilePathOpt: Option[String]):
Descriptor = {
+ descFilePathOpt match {
+ case Some(filePath) => buildDescriptor(descFilePath = filePath,
messageName)
+ case None => buildDescriptorFromJavaClass(messageName)
+ }
+ }
- for (descriptor <- fileDescriptor.getMessageTypes.asScala) {
- if (descriptor.getName().equals(messageName)) {
- result = descriptor
- }
+ /**
+ * Loads the given protobuf class and returns Protobuf descriptor for it.
+ */
+ def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor = {
+ val protobufClass = try {
+ Utils.classForName(protobufClassName)
+ } catch {
+ case _: ClassNotFoundException =>
+ val hasDots = protobufClassName.contains(".")
+ throw new IllegalArgumentException(
+ s"Could not load Protobuf class with name '$protobufClassName'" +
+ (if (hasDots) "" else ". Ensure the class name includes package
prefix.")
+ )
+ }
+
+ if (!classOf[Message].isAssignableFrom(protobufClass)) {
+ throw new IllegalArgumentException(s"$protobufClassName is not a
Protobuf message type")
+ // TODO: Need to support V2. This might work with V2 classes too.
+ }
+
+ // Extract the descriptor from Protobuf message.
+ protobufClass
+ .getDeclaredMethod("getDescriptor")
+ .invoke(null)
+ .asInstanceOf[Descriptor]
+ }
+
+ def buildDescriptor(descFilePath: String, messageName: String): Descriptor =
{
+ val descriptor =
parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
+ desc.getName == messageName || desc.getFullName == messageName
Review Comment:
For descriptor files, checks both simple name as well as full name including
package name. We were checking just the simple name. It might conflict in
larger protobuf repos.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -56,44 +91,45 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Seri
lit(1202.00).cast(org.apache.spark.sql.types.FloatType).as("float_value"),
lit(true).as("bool_value"),
lit("0".getBytes).as("bytes_value")).as("SimpleMessage"))
- val protoStructDF = df.select(
- functions.to_protobuf($"SimpleMessage", testFileDesc,
"SimpleMessage").as("proto"))
- val actualDf = protoStructDF.select(
- functions.from_protobuf($"proto", testFileDesc,
"SimpleMessage").as("proto.*"))
- checkAnswer(actualDf, df)
+
+ checkWithFileAndClassName("SimpleMessage") {
+ case (name, descFilePathOpt) =>
+ val protoStructDF = df.select(
+ to_protobuf_wrapper($"SimpleMessage", name,
descFilePathOpt).as("proto"))
+ val actualDf = protoStructDF.select(
+ from_protobuf_wrapper($"proto", name, descFilePathOpt).as("proto.*"))
+ checkAnswer(actualDf, df)
+ }
}
test("roundtrip in from_protobuf and to_protobuf - Repeated") {
- val descriptor = ProtobufUtils.buildDescriptor(testFileDesc,
"SimpleMessageRepeated")
- val dynamicMessage = DynamicMessage
- .newBuilder(descriptor)
- .setField(descriptor.findFieldByName("key"), "key")
- .setField(descriptor.findFieldByName("value"), "value")
- .addRepeatedField(descriptor.findFieldByName("rbool_value"), false)
- .addRepeatedField(descriptor.findFieldByName("rbool_value"), true)
- .addRepeatedField(descriptor.findFieldByName("rdouble_value"),
1092092.654d)
- .addRepeatedField(descriptor.findFieldByName("rdouble_value"),
1092093.654d)
- .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10903.0f)
- .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10902.0f)
- .addRepeatedField(
- descriptor.findFieldByName("rnested_enum"),
-
descriptor.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
- .addRepeatedField(
- descriptor.findFieldByName("rnested_enum"),
-
descriptor.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
+ val protoMessage = SimpleMessageRepeated
Review Comment:
Using Java class rather than genetic descriptor to prepare protobuf bytes.
Could do this for other tests too.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala:
##########
@@ -99,26 +115,32 @@ class ProtobufCatalystDataConversionSuite
StructType(StructField("bytes_type", BinaryType, nullable = true) :: Nil),
StructType(StructField("string_type", StringType, nullable = true) :: Nil))
- private val catalystTypesToProtoMessages: Map[DataType, String] = Map(
- IntegerType -> "IntegerMsg",
- DoubleType -> "DoubleMsg",
- FloatType -> "FloatMsg",
- BinaryType -> "BytesMsg",
- StringType -> "StringMsg")
+ private val catalystTypesToProtoMessages: Map[DataType, (String, Any)] = Map(
+ IntegerType -> ("IntegerMsg", 0),
+ DoubleType -> ("DoubleMsg", 0.0d),
+ FloatType -> ("FloatMsg", 0.0f),
+ BinaryType -> ("BytesMsg", ByteString.empty().toByteArray),
+ StringType -> ("StringMsg", ""))
testingTypes.foreach { dt =>
val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
- val filePath = testFile("protobuf/catalyst_types.desc").replace("file:/",
"/")
test(s"single $dt with seed $seed") {
+
+ val (messageName, defaultValue) =
catalystTypesToProtoMessages(dt.fields(0).dataType)
+
val rand = new scala.util.Random(seed)
- val data = RandomDataGenerator.forType(dt, rand = rand).get.apply()
+ val generator = RandomDataGenerator.forType(dt, rand = rand).get
+ var data = generator()
+ while (data.asInstanceOf[Row].get(0) == defaultValue) // Do not use
default values, since
Review Comment:
This test would be flaky without this.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -120,13 +156,17 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Seri
.build()
val df = Seq(dynamicMessage.toByteArray).toDF("value")
- val fromProtoDF = df.select(
- functions.from_protobuf($"value", testFileDesc,
"RepeatedMessage").as("value_from"))
- val toProtoDF = fromProtoDF.select(
- functions.to_protobuf($"value_from", testFileDesc,
"RepeatedMessage").as("value_to"))
- val toFromProtoDF = toProtoDF.select(
- functions.from_protobuf($"value_to", testFileDesc,
"RepeatedMessage").as("value_to_from"))
- checkAnswer(fromProtoDF.select($"value_from.*"),
toFromProtoDF.select($"value_to_from.*"))
+
+ checkWithFileAndClassName("RepeatedMessage") {
Review Comment:
This runs the block twice: once with descriptor file and another time with
java class.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -411,17 +470,17 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Seri
val df = Seq(oldProducerMessage.toByteArray).toDF("oldProducerData")
val fromProtoDf = df.select(
functions
- .from_protobuf($"oldProducerData", testFileDesc, "newConsumer")
+ .from_protobuf($"oldProducerData", "newConsumer", testFileDesc)
Review Comment:
These are related to change in order of args in API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]