rangadi commented on code in PR #38286:
URL: https://github.com/apache/spark/pull/38286#discussion_r997323389


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -108,18 +112,16 @@ private[protobuf] case class ProtobufDataToCatalyst(
     val binary = input.asInstanceOf[Array[Byte]]
     try {
       result = DynamicMessage.parseFrom(messageDescriptor, binary)
-      val unknownFields = result.getUnknownFields
-      if (!unknownFields.asMap().isEmpty) {
-        unknownFields.asMap().keySet().asScala.map { number =>
-          {
-            if (fieldsNumbers.contains(number)) {
-              return handleException(
-                new Throwable(s"Type mismatch encountered for field:" +
-                  s" ${messageDescriptor.getFields.get(number)}"))
-            }
-          }
-        }
+
+      
result.getUnknownFields.asMap().keySet().asScala.find(fieldsNumbers.contains(_))
 match {

Review Comment:
   no functional change. Simplifies the code and adds a comment about the error 
check. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -132,23 +133,63 @@ private[sql] object ProtobufUtils extends Logging {
     }
   }
 
-  def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
-    val fileDescriptor: Descriptors.FileDescriptor = 
parseFileDescriptor(descFilePath)
-    var result: Descriptors.Descriptor = null;
+  /**
+   * Builds Protobuf message descriptor either from the Java class or from 
serialized descriptor
+   * read from the file.
+   * @param messageName
+   *  Protobuf message name or Java class name.
+   * @param descFilePathOpt
+   *  When the file name set, the descriptor and it's dependencies are read 
from the file. Other
+   *  the `messageName` is treated as Java class name.
+   * @return
+   */
+  def buildDescriptor(messageName: String, descFilePathOpt: Option[String]): 
Descriptor = {
+    descFilePathOpt match {
+      case Some(filePath) => buildDescriptor(descFilePath = filePath, 
messageName)
+      case None => buildDescriptorFromJavaClass(messageName)
+    }
+  }
 
-    for (descriptor <- fileDescriptor.getMessageTypes.asScala) {
-      if (descriptor.getName().equals(messageName)) {
-        result = descriptor
-      }
+  /**
+   *  Loads the given protobuf class and returns Protobuf descriptor for it.
+   */
+  def buildDescriptorFromJavaClass(protobufClassName: String): Descriptor = {
+    val protobufClass = try {
+      Utils.classForName(protobufClassName)
+    } catch {
+      case _: ClassNotFoundException =>
+        val hasDots = protobufClassName.contains(".")
+        throw new IllegalArgumentException(
+          s"Could not load Protobuf class with name '$protobufClassName'" +
+          (if (hasDots) "" else ". Ensure the class name includes package 
prefix.")
+        )
+    }
+
+    if (!classOf[Message].isAssignableFrom(protobufClass)) {
+      throw new IllegalArgumentException(s"$protobufClassName is not a 
Protobuf message type")
+      // TODO: Need to support V2. This might work with V2 classes too.
+    }
+
+    // Extract the descriptor from Protobuf message.
+    protobufClass
+      .getDeclaredMethod("getDescriptor")
+      .invoke(null)
+      .asInstanceOf[Descriptor]
+  }
+
+  def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
+    val descriptor = 
parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
+      desc.getName == messageName || desc.getFullName == messageName

Review Comment:
   For descriptor files, checks both simple name as well as full name including 
package name. We were checking just the simple name. It might conflict in 
larger protobuf repos. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -56,44 +91,45 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Seri
         
lit(1202.00).cast(org.apache.spark.sql.types.FloatType).as("float_value"),
         lit(true).as("bool_value"),
         lit("0".getBytes).as("bytes_value")).as("SimpleMessage"))
-    val protoStructDF = df.select(
-      functions.to_protobuf($"SimpleMessage", testFileDesc, 
"SimpleMessage").as("proto"))
-    val actualDf = protoStructDF.select(
-      functions.from_protobuf($"proto", testFileDesc, 
"SimpleMessage").as("proto.*"))
-    checkAnswer(actualDf, df)
+
+    checkWithFileAndClassName("SimpleMessage") {
+      case (name, descFilePathOpt) =>
+        val protoStructDF = df.select(
+          to_protobuf_wrapper($"SimpleMessage", name, 
descFilePathOpt).as("proto"))
+        val actualDf = protoStructDF.select(
+          from_protobuf_wrapper($"proto", name, descFilePathOpt).as("proto.*"))
+        checkAnswer(actualDf, df)
+    }
   }
 
   test("roundtrip in from_protobuf and to_protobuf - Repeated") {
-    val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, 
"SimpleMessageRepeated")
 
-    val dynamicMessage = DynamicMessage
-      .newBuilder(descriptor)
-      .setField(descriptor.findFieldByName("key"), "key")
-      .setField(descriptor.findFieldByName("value"), "value")
-      .addRepeatedField(descriptor.findFieldByName("rbool_value"), false)
-      .addRepeatedField(descriptor.findFieldByName("rbool_value"), true)
-      .addRepeatedField(descriptor.findFieldByName("rdouble_value"), 
1092092.654d)
-      .addRepeatedField(descriptor.findFieldByName("rdouble_value"), 
1092093.654d)
-      .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10903.0f)
-      .addRepeatedField(descriptor.findFieldByName("rfloat_value"), 10902.0f)
-      .addRepeatedField(
-        descriptor.findFieldByName("rnested_enum"),
-        
descriptor.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
-      .addRepeatedField(
-        descriptor.findFieldByName("rnested_enum"),
-        
descriptor.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
+    val protoMessage = SimpleMessageRepeated

Review Comment:
   Using Java class rather than genetic descriptor to prepare protobuf bytes. 
Could do this for other tests too. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala:
##########
@@ -99,26 +115,32 @@ class ProtobufCatalystDataConversionSuite
     StructType(StructField("bytes_type", BinaryType, nullable = true) :: Nil),
     StructType(StructField("string_type", StringType, nullable = true) :: Nil))
 
-  private val catalystTypesToProtoMessages: Map[DataType, String] = Map(
-    IntegerType -> "IntegerMsg",
-    DoubleType -> "DoubleMsg",
-    FloatType -> "FloatMsg",
-    BinaryType -> "BytesMsg",
-    StringType -> "StringMsg")
+  private val catalystTypesToProtoMessages: Map[DataType, (String, Any)] = Map(
+    IntegerType -> ("IntegerMsg", 0),
+    DoubleType -> ("DoubleMsg", 0.0d),
+    FloatType -> ("FloatMsg", 0.0f),
+    BinaryType -> ("BytesMsg", ByteString.empty().toByteArray),
+    StringType -> ("StringMsg", ""))
 
   testingTypes.foreach { dt =>
     val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
-    val filePath = testFile("protobuf/catalyst_types.desc").replace("file:/", 
"/")
     test(s"single $dt with seed $seed") {
+
+      val (messageName, defaultValue) = 
catalystTypesToProtoMessages(dt.fields(0).dataType)
+
       val rand = new scala.util.Random(seed)
-      val data = RandomDataGenerator.forType(dt, rand = rand).get.apply()
+      val generator = RandomDataGenerator.forType(dt, rand = rand).get
+      var data = generator()
+      while (data.asInstanceOf[Row].get(0) == defaultValue) // Do not use 
default values, since

Review Comment:
   This test would be flaky without this. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -120,13 +156,17 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Seri
       .build()
 
     val df = Seq(dynamicMessage.toByteArray).toDF("value")
-    val fromProtoDF = df.select(
-      functions.from_protobuf($"value", testFileDesc, 
"RepeatedMessage").as("value_from"))
-    val toProtoDF = fromProtoDF.select(
-      functions.to_protobuf($"value_from", testFileDesc, 
"RepeatedMessage").as("value_to"))
-    val toFromProtoDF = toProtoDF.select(
-      functions.from_protobuf($"value_to", testFileDesc, 
"RepeatedMessage").as("value_to_from"))
-    checkAnswer(fromProtoDF.select($"value_from.*"), 
toFromProtoDF.select($"value_to_from.*"))
+
+    checkWithFileAndClassName("RepeatedMessage") {

Review Comment:
   This runs the block twice: once with descriptor file and another time with 
java class. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -411,17 +470,17 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Seri
     val df = Seq(oldProducerMessage.toByteArray).toDF("oldProducerData")
     val fromProtoDf = df.select(
       functions
-        .from_protobuf($"oldProducerData", testFileDesc, "newConsumer")
+        .from_protobuf($"oldProducerData", "newConsumer", testFileDesc)

Review Comment:
   These are related to change in order of args in API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to