This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 05df8c472b4 [SPARK-43312][PROTOBUF] Option to convert Any fields into 
JSON
05df8c472b4 is described below

commit 05df8c472b47f6c29c2d936499b5bf2f0fbae99f
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Thu May 4 16:57:13 2023 -0700

    [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON
    
    ### What changes were proposed in this pull request?
    
    This adds an option to convert Protobuf 'Any' fields to JSON. At runtime 
such 'Any' fields
    can contain arbitrary Protobuf message serialized as binary data.
    
    By default when this option is not enabled, such field behaves like normal 
Protobuf message
    with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary 
`value` field is not
    interpreted. This might not be convenient in practice.
    
    One option is to deserialize it into actual Protobuf message and convert it 
to Spark STRUCT.
    But this is not feasible since the schema for `from_protobuf()` is needed 
at query compile
    time and can not change at run time. As a result this is not feasible.
    
    Another option is parse the binary and deserialize the Protobuf message 
into JSON string.
    This this lot more readable than the binary data. This configuration option 
enables
    converting Any fields to JSON. The example blow clarifies further.
    
     Consider two Protobuf types defined as follows:
    ```
       message ProtoWithAny {
          string event_name = 1;
          google.protobuf.Any details = 2;
       }
    
       message Person {
         string name = 1;
         int32 id = 2;
      }
    ```
    With this option enabled, schema for `from_protobuf("col", messageName = 
"ProtoWithAny")`
    would be : `STRUCT<event_name: STRING, details: STRING>`.
    At run time, if `details` field contains `Person` Protobuf message, the 
returned value looks
    like the this:
    
        ('click', 
'{"type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
    
    Requirements:
     - The definitions for all the possible Protobuf types that are used in Any 
fields should be
       available in the Protobuf descriptor file passed to `from_protobuf()`. 
If any Protobuf
       is not found, it will result in error for that record.
     - This feature is supported with Java classes as well. But only the 
Protobuf types defined
       in the same `proto` file as the primary Java class might be visible.
       E.g. if `ProtoWithAny` and `Person` in above example are in different 
proto files,
       definition for `Person` may not be found.
    
    ### Why are the changes needed?
    
    Improves handling of Any fields.
    
    ### Does this PR introduce _any_ user-facing change?
    No. Default behavior is not changed
    
    ### How was this patch tested?
    - Unit tests
    
    Closes #40983 from rangadi/protobuf-any.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 connector/protobuf/pom.xml                         |   7 ++
 .../sql/protobuf/ProtobufDataToCatalyst.scala      |  17 ++-
 .../spark/sql/protobuf/ProtobufDeserializer.scala  |  24 +++-
 .../spark/sql/protobuf/utils/ProtobufOptions.scala |  80 ++++++++++--
 .../spark/sql/protobuf/utils/ProtobufUtils.scala   |  17 +++
 .../sql/protobuf/utils/SchemaConverters.scala      |   3 +
 .../test/resources/protobuf/functions_suite.desc   | Bin 10635 -> 11087 bytes
 .../test/resources/protobuf/functions_suite.proto  |  12 ++
 .../sql/protobuf/ProtobufFunctionsSuite.scala      | 135 ++++++++++++++++++++-
 9 files changed, 278 insertions(+), 17 deletions(-)

diff --git a/connector/protobuf/pom.xml b/connector/protobuf/pom.xml
index e5e1fe342a1..6feef54ce71 100644
--- a/connector/protobuf/pom.xml
+++ b/connector/protobuf/pom.xml
@@ -82,6 +82,12 @@
       <version>${protobuf.version}</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>${protobuf.version}</version>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
@@ -145,6 +151,7 @@
                     <include>src/test/resources/protobuf</include>
                   </inputDirectories>
                   <addSources>test</addSources>
+                  <includeStdTypes>true</includeStdTypes>
                 </configuration>
               </execution>
             </executions>
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index cf6114f2f6c..ae2bd9fc4a7 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -20,8 +20,8 @@ import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import com.google.protobuf.DynamicMessage
+import com.google.protobuf.TypeRegistry
 
-import org.apache.spark.sql.catalyst.NoopFilters
 import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, SpecificInternalRow, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
 import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
@@ -64,12 +64,21 @@ private[protobuf] case class ProtobufDataToCatalyst(
   @transient private lazy val fieldsNumbers =
     messageDescriptor.getFields.asScala.map(f => f.getNumber).toSet
 
-  @transient private lazy val deserializer =
+  @transient private lazy val deserializer = {
+    val typeRegistry = descFilePath match {
+      case Some(path) if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(path) // This loads all the messages 
in the file.
+      case None if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(messageDescriptor) // Loads only 
connected messages.
+      case _ => TypeRegistry.getEmptyTypeRegistry // Default. Json conversion 
is not enabled.
+    }
     new ProtobufDeserializer(
       messageDescriptor,
       dataType,
-      new NoopFilters,
-      protobufOptions.emitDefaultValues)
+      typeRegistry = typeRegistry,
+      emitDefaultValues = protobufOptions.emitDefaultValues
+    )
+  }
 
   @transient private var result: DynamicMessage = _
 
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
index c3bc46c62ea..d1464163568 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
@@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit
 import com.google.protobuf.{ByteString, DynamicMessage, Message}
 import com.google.protobuf.Descriptors._
 import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+import com.google.protobuf.TypeRegistry
+import com.google.protobuf.util.JsonFormat
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
@@ -36,11 +38,14 @@ import org.apache.spark.unsafe.types.UTF8String
 private[sql] class ProtobufDeserializer(
     rootDescriptor: Descriptor,
     rootCatalystType: DataType,
-    filters: StructFilters,
+    filters: StructFilters = new NoopFilters,
+    typeRegistry: TypeRegistry = TypeRegistry.getEmptyTypeRegistry,
     emitDefaultValues: Boolean = false) {
 
   def this(rootDescriptor: Descriptor, rootCatalystType: DataType) = {
-    this(rootDescriptor, rootCatalystType, new NoopFilters, false)
+    this(
+      rootDescriptor, rootCatalystType, new NoopFilters, 
TypeRegistry.getEmptyTypeRegistry, false
+    )
   }
 
   private val converter: Any => Option[InternalRow] =
@@ -71,6 +76,14 @@ private[sql] class ProtobufDeserializer(
 
   def deserialize(data: Message): Option[InternalRow] = converter(data)
 
+  // JsonFormatter used to convert Any fields (if the option is enabled).
+  // This keeps original field names and does not include any extra whitespace 
in JSON.
+  // If the runtime type for Any field is not found in the registry, it throws 
an exception.
+  private val jsonPrinter = JsonFormat.printer
+    .omittingInsignificantWhitespace()
+    .preservingProtoFieldNames()
+    .usingTypeRegistry(typeRegistry)
+
   private def newArrayWriter(
       protoField: FieldDescriptor,
       protoPath: Seq[String],
@@ -225,6 +238,13 @@ private[sql] class ProtobufDeserializer(
           val micros = DateTimeUtils.millisToMicros(seconds * 1000)
           updater.setLong(ordinal, micros + 
TimeUnit.NANOSECONDS.toMicros(nanoSeconds))
 
+      case (MESSAGE, StringType)
+          if protoType.getMessageType.getFullName == "google.protobuf.Any" =>
+        (updater, ordinal, value) =>
+          // Convert 'Any' protobuf message to JSON string.
+          val jsonStr = jsonPrinter.print(value.asInstanceOf[DynamicMessage])
+          updater.set(ordinal, UTF8String.fromString(jsonStr))
+
       case (MESSAGE, st: StructType) =>
         val writeRecord = getRecordWriter(
           protoType.getMessageType,
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
index 0af8c50dc5a..04436346dec 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
@@ -32,6 +32,8 @@ private[sql] class ProtobufOptions(
     extends FileSourceOptions(parameters)
     with Logging {
 
+  import ProtobufOptions._
+
   def this(parameters: Map[String, String], conf: Configuration) = {
     this(CaseInsensitiveMap(parameters), conf)
   }
@@ -39,15 +41,72 @@ private[sql] class ProtobufOptions(
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
 
-  // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse 
once,
-  // and 2 allows it to be recursed twice and so on. A value of 
`recursive.fields.max.depth`
-  // greater than 10 is not permitted. If it is not  specified, the default 
value is -1;
-  // A value of 0 or below disallows any recursive fields. If a protobuf
-  // record has more depth than the allowed value for recursive fields, it 
will be truncated
-  // and corresponding fields are ignored (dropped).
+  /**
+   * Adds support for recursive fields. If this option is is not specified, 
recursive fields are
+   * not permitted. Setting it to 0 drops the recursive fields, 1 allows it to 
be recursed once,
+   * and 2 allows it to be recursed twice and so on, up to 10. Values larger 
than 10 are not
+   * allowed in order avoid inadvertently creating very large schemas. If a 
Protobuf message
+   * has depth beyond this limit, the Spark struct returned is truncated after 
the recursion limit.
+   *
+   * Examples. Consider a Protobuf with a recursive field:
+   *   `message Person { string name = 1; Person friend = 2; }`
+   * The following lists the schema with different values for this setting.
+   *  1:  `struct<name: string>`
+   *  2:  `struct<name string, friend: struct<name: string>>`
+   *  3:  `struct<name string, friend: struct<name string, friend: 
struct<name: string>>>`
+   * and so on.
+   */
   val recursiveFieldMaxDepth: Int = 
parameters.getOrElse("recursive.fields.max.depth", "-1").toInt
 
-  // Whether to render fields with zero values when deserializing Protobufs to 
a Spark struct.
+  /**
+   * This option ("convert.any.fields.to.json") enables converting Protobuf 
'Any' fields to JSON.
+   * At runtime, such 'Any' fields can contain arbitrary Protobuf messages as 
binary data.
+   *
+   * By default when this option is not enabled, such field behaves like 
normal Protobuf message
+   * with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary 
`value` field is not
+   * interpreted. The binary data might not be convenient in practice to work 
with.
+   *
+   * One option is to deserialize it into actual Protobuf message and convert 
it to Spark STRUCT.
+   * But this is not feasible since the schema for `from_protobuf()` is needed 
at query compile
+   * time and can not change at runtime. As a result, this option is not 
feasible.
+   *
+   * Another option is parse the binary and deserialize the Protobuf message 
into JSON string.
+   * This this lot more readable than the binary data. This configuration 
option enables
+   * converting Any fields to JSON. The example blow clarifies further.
+   *
+   *  Consider two Protobuf types defined as follows:
+   *    message ProtoWithAny {
+   *       string event_name = 1;
+   *       google.protobuf.Any details = 2;
+   *    }
+   *
+   *    message Person {
+   *      string name = 1;
+   *      int32 id = 2;
+   *   }
+   *
+   * With this option enabled, schema for `from_protobuf("col", messageName = 
"ProtoWithAny")`
+   * would be : `STRUCT<event_name: STRING, details: STRING>`.
+   * At run time, if `details` field contains `Person` Protobuf message, the 
returned value looks
+   * like this:
+   *   ('click', 
'{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
+   *
+   * Requirements:
+   *  - The definitions for all the possible Protobuf types that are used in 
Any fields should be
+   *    available in the Protobuf descriptor file passed to `from_protobuf()`. 
If any Protobuf
+   *    is not found, it will result in error for that record.
+   *  - This feature is supported with Java classes as well. But only the 
Protobuf types defined
+   *    in the same `proto` file as the primary Java class might be visible.
+   *    E.g. if `ProtoWithAny` and `Person` in above example are in different 
proto files,
+   *    definition for `Person` may not be found.
+   *
+   * This feature should be enabled carefully. JSON conversion and processing 
are inefficient.
+   * In addition schema safety is also reduced making downstream processing 
error prone.
+   */
+  val convertAnyFieldsToJson: Boolean =
+    parameters.getOrElse(CONVERT_ANY_FIELDS_TO_JSON_CONFIG, "false").toBoolean
+
+  // Whether to render fields with zero values when deserializing Protobuf to 
a Spark struct.
   // When a field is empty in the serialized Protobuf, this library will 
deserialize them as
   // null by default. However, this flag can control whether to render the 
type-specific zero value.
   // This operates similarly to `includingDefaultValues` in 
protobuf-java-util's JsonFormat, or
@@ -65,15 +124,16 @@ private[sql] class ProtobufOptions(
   // ```
   //
   // And we have a proto constructed like:
-  // `Person(age=0, middle_name="")
+  // `Person(age=0, middle_name="")`
   //
-  // The result after calling from_protobuf without this flag set would be:
+  // The result after calling from_protobuf() without this flag set would be:
   // `{"name": null, "age": null, "middle_name": "", "salary": null}`
   // (age is null because zero-value singular fields are not in the wire 
format in proto3).
   //
   //
   // With this flag it would be:
   // `{"name": "", "age": 0, "middle_name": "", "salary": null}`
+  // ("salary" remains null, since it is declared explicitly as an optional 
field)
   //
   // Ref: https://protobuf.dev/programming-guides/proto3/#default for 
information about
   //      type-specific defaults.
@@ -90,4 +150,6 @@ private[sql] object ProtobufOptions {
       .getOrElse(new Configuration())
     new ProtobufOptions(CaseInsensitiveMap(parameters), hadoopConf)
   }
+
+  val CONVERT_ANY_FIELDS_TO_JSON_CONFIG = "convert.any.fields.to.json"
 }
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index bf207d6068f..5688a483da0 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.{DescriptorProtos, Descriptors, 
InvalidProtocolBufferException, Message}
 import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, 
FileDescriptorSet}
 import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+import com.google.protobuf.TypeRegistry
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -283,4 +284,20 @@ private[sql] object ProtobufUtils extends Logging {
     case Seq() => "top-level record"
     case n => s"field '${n.mkString(".")}'"
   }
+
+  /** Builds [[TypeRegistry]] with all the messages found in the descriptor 
file. */
+  private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry 
= {
+    val registryBuilder = TypeRegistry.newBuilder()
+    for (fileDesc <- parseFileDescriptorSet(descFilePath)) {
+      registryBuilder.add(fileDesc.getMessageTypes)
+    }
+    registryBuilder.build()
+  }
+
+  /** Builds [[TypeRegistry]] with the descriptor and the others from the same 
proto file. */
+  private [protobuf] def buildTypeRegistry(descriptor: Descriptor): 
TypeRegistry = {
+    TypeRegistry.newBuilder()
+      .add(descriptor) // This adds any other descriptors in the associated 
proto file.
+      .build()
+  }
 }
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
index e277f2999e4..b2ed7d64f36 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
@@ -88,6 +88,9 @@ object SchemaConverters extends Logging {
           fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
           fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
         Some(TimestampType)
+      case MESSAGE if protobufOptions.convertAnyFieldsToJson &&
+            fd.getMessageType.getFullName == "google.protobuf.Any" =>
+        Some(StringType) // Any protobuf will be parsed and converted to json 
string.
       case MESSAGE if fd.isRepeated && 
fd.getMessageType.getOptions.hasMapEntry =>
         var keyType: Option[DataType] = None
         var valueType: Option[DataType] = None
diff --git 
a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc 
b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc
index 80b78cd75f6..215dc05707f 100644
Binary files 
a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc and 
b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc differ
diff --git 
a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto 
b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
index 2e9add4987c..a9d8d626481 100644
--- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
@@ -26,6 +26,7 @@ package org.apache.spark.sql.protobuf.protos;
 import "timestamp.proto";
 import "duration.proto";
 import "basicmessage.proto";
+import "google/protobuf/any.proto";
 
 option java_outer_classname = "SimpleMessageProtos";
 
@@ -284,6 +285,17 @@ message Status {
   Status status = 3;
 }
 
+// Messages for testing Any fields
+message ProtoWithAny {
+  string event_name = 1;
+  google.protobuf.Any details = 3;
+}
+
+message ProtoWithAnyArray { // A container of Any fields
+  string description = 1;
+  repeated google.protobuf.Any items = 2;
+}
+
 // Contains a representative sample of all types, using the groupings defined
 // here: 
https://protobuf.dev/programming-guides/field_presence/#presence-in-proto3-apis
 message Proto3AllTypes {
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
index 3105d9dc8b5..c47cd882af9 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
@@ -21,13 +21,16 @@ import java.time.Duration
 
 import scala.collection.JavaConverters._
 
-import com.google.protobuf.{ByteString, DynamicMessage}
+import com.google.protobuf.{Any => AnyProto, ByteString, DynamicMessage}
+import org.json4s.StringInput
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
 import org.apache.spark.sql.functions.{lit, struct, typedLit}
 import org.apache.spark.sql.protobuf.protos.Proto2Messages.Proto2AllTypes
 import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos._
 import 
org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated.NestedEnum
+import org.apache.spark.sql.protobuf.utils.ProtobufOptions
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -40,7 +43,7 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
   val testFileDesc = testFile("functions_suite.desc", 
"protobuf/functions_suite.desc")
   private val javaClassNamePrefix = 
"org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$"
 
-  val proto2FileDesc = testFile("proto2_messages.desc", 
"protobuf/proto_messages.desc")
+  val proto2FileDesc = testFile("proto2_messages.desc", 
"protobuf/proto2_messages.desc")
   private val proto2JavaClassNamePrefix = 
"org.apache.spark.sql.protobuf.protos.Proto2Messages$"
 
   private def emptyBinaryDF = Seq(Array[Byte]()).toDF("binary")
@@ -1120,6 +1123,134 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
     }
   }
 
+  test("Converting Any fields to JSON") {
+    // Verifies schema and deserialization when 'convert.any.fields.to.json' 
is set.
+    checkWithFileAndClassName("ProtoWithAny") {
+      case (name, descFilePathOpt) =>
+
+        // proto: 'message { string event_name = 1; google.protobuf.Any 
details = 2 }'
+
+        val simpleProto = SimpleMessage // Json: 
{"id":10,"string_value":"galaxy"}
+          .newBuilder()
+          .setId(10)
+          .setStringValue("galaxy")
+          .build()
+
+        val protoWithAnyBytes = ProtoWithAny
+          .newBuilder()
+          .setEventName("click")
+          .setDetails(AnyProto.pack(simpleProto))
+          .build()
+          .toByteArray
+
+        val inputDF = Seq(protoWithAnyBytes).toDF("binary")
+
+        // Check schema with default options where Any field not converted to 
json.
+        val df = inputDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt).as("proto")
+        )
+        // Default behavior: 'details' is a struct with 'type_url' and binary 
'value'.
+        assert(df.schema.toDDL ==
+          "proto STRUCT<event_name: STRING, details: STRUCT<type_url: STRING, 
value: BINARY>>"
+        )
+
+        // Enable option to convert to json.
+        val options = Map(ProtobufOptions.CONVERT_ANY_FIELDS_TO_JSON_CONFIG -> 
"true")
+        val dfJson = inputDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt, 
options).as("proto")
+        )
+        // Now 'details' should be a string.
+        assert(dfJson.schema.toDDL == "proto STRUCT<event_name: STRING, 
details: STRING>")
+
+        // Verify Json value for details
+
+        val row = dfJson.collect()(0).getStruct(0)
+
+        val expectedJson = """{"@type":""" + // The json includes "@type" 
field as well.
+            
""""type.googleapis.com/org.apache.spark.sql.protobuf.protos.SimpleMessage",""" 
+
+            """"id":"10","string_value":"galaxy"}"""
+
+        assert(row.getString(0) == "click")
+        assert(row.getString(1) == expectedJson)
+    }
+  }
+
+  test("Converting nested Any fields to JSON") {
+    // This is a more involved version of the previous test with nested Any 
field inside an array.
+
+    // Takes json string and return a json with all the extra whitespace 
removed.
+    def compactJson(json: String): String = {
+      val jsonValue = JsonMethods.parse(StringInput(json))
+      JsonMethods.compact(jsonValue)
+    }
+
+    checkWithFileAndClassName("ProtoWithAnyArray") { case (name, 
descFilePathOpt) =>
+
+      // proto: message { string description = 1; repeated google.protobuf.Any 
items = 2;
+
+      // Use two different types of protos for 'items'. One with an Any field, 
and one without.
+
+      val simpleProto = SimpleMessage.newBuilder() // Json: 
{"id":10,"string_value":"galaxy"}
+        .setId(10)
+        .setStringValue("galaxy")
+        .build()
+
+      val protoWithAny = ProtoWithAny.newBuilder()
+        .setEventName("click")
+        .setDetails(AnyProto.pack(simpleProto))
+        .build()
+
+      val protoWithAnyArrayBytes = ProtoWithAnyArray.newBuilder()
+        .setDescription("nested any demo")
+        .addItems(AnyProto.pack(simpleProto)) // A simple proto
+        .addItems(AnyProto.pack(protoWithAny)) // A proto with any field 
inside it.
+        .build()
+        .toByteArray
+
+      val inputDF = Seq(protoWithAnyArrayBytes).toDF("binary")
+
+      // check default schema
+      val df = inputDF.select(
+        from_protobuf_wrapper($"binary", name, descFilePathOpt).as("proto")
+      )
+      // Default behavior: 'details' is a struct with 'type_url' and binary 
'value'.
+      assert(df.schema.toDDL == "proto STRUCT<description: STRING, " +
+        "items: ARRAY<STRUCT<type_url: STRING, value: BINARY>>>"
+      )
+
+      // String for items with 'convert.to.json' option enabled.
+      val options = Map(ProtobufOptions.CONVERT_ANY_FIELDS_TO_JSON_CONFIG -> 
"true")
+      val dfJson = inputDF.select(from_protobuf_wrapper(
+        $"binary", name, descFilePathOpt, options).as("proto")
+      )
+      // Now 'details' should be a string.
+      assert(dfJson.schema.toDDL == "proto STRUCT<description: STRING, items: 
ARRAY<STRING>>")
+
+      val row = dfJson.collect()(0).getStruct(0)
+      val items = row.getList[String](1)
+
+      assert(row.getString(0) == "nested any demo")
+      assert(items.get(0) == compactJson(
+        """
+          | {
+          |   
"@type":"type.googleapis.com/org.apache.spark.sql.protobuf.protos.SimpleMessage",
+          |   "id":"10",
+          |   "string_value":"galaxy"
+          | }""".stripMargin))
+      assert(items.get(1) == compactJson(
+        """
+          | {
+          |   
"@type":"type.googleapis.com/org.apache.spark.sql.protobuf.protos.ProtoWithAny",
+          |   "event_name":"click",
+          |   "details": {
+          |     
"@type":"type.googleapis.com/org.apache.spark.sql.protobuf.protos.SimpleMessage",
+          |     "id":"10",
+          |     "string_value":"galaxy"
+          |   }
+          | }""".stripMargin))
+    }
+  }
+
   test("test explicitly set zero values - proto3") {
     // All fields explicitly zero. Message, map, repeated, and oneof fields
     // are left unset, as null is their zero value.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to