This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 05df8c472b4 [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON 05df8c472b4 is described below commit 05df8c472b47f6c29c2d936499b5bf2f0fbae99f Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Thu May 4 16:57:13 2023 -0700 [SPARK-43312][PROTOBUF] Option to convert Any fields into JSON ### What changes were proposed in this pull request? This adds an option to convert Protobuf 'Any' fields to JSON. At runtime such 'Any' fields can contain arbitrary Protobuf message serialized as binary data. By default when this option is not enabled, such field behaves like normal Protobuf message with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not interpreted. This might not be convenient in practice. One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT. But this is not feasible since the schema for `from_protobuf()` is needed at query compile time and can not change at run time. As a result this is not feasible. Another option is parse the binary and deserialize the Protobuf message into JSON string. This this lot more readable than the binary data. This configuration option enables converting Any fields to JSON. The example blow clarifies further. Consider two Protobuf types defined as follows: ``` message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; } ``` With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")` would be : `STRUCT<event_name: STRING, details: STRING>`. At run time, if `details` field contains `Person` Protobuf message, the returned value looks like the this: ('click', '{"type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}') Requirements: - The definitions for all the possible Protobuf types that are used in Any fields should be available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf is not found, it will result in error for that record. - This feature is supported with Java classes as well. But only the Protobuf types defined in the same `proto` file as the primary Java class might be visible. E.g. if `ProtoWithAny` and `Person` in above example are in different proto files, definition for `Person` may not be found. ### Why are the changes needed? Improves handling of Any fields. ### Does this PR introduce _any_ user-facing change? No. Default behavior is not changed ### How was this patch tested? - Unit tests Closes #40983 from rangadi/protobuf-any. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- connector/protobuf/pom.xml | 7 ++ .../sql/protobuf/ProtobufDataToCatalyst.scala | 17 ++- .../spark/sql/protobuf/ProtobufDeserializer.scala | 24 +++- .../spark/sql/protobuf/utils/ProtobufOptions.scala | 80 ++++++++++-- .../spark/sql/protobuf/utils/ProtobufUtils.scala | 17 +++ .../sql/protobuf/utils/SchemaConverters.scala | 3 + .../test/resources/protobuf/functions_suite.desc | Bin 10635 -> 11087 bytes .../test/resources/protobuf/functions_suite.proto | 12 ++ .../sql/protobuf/ProtobufFunctionsSuite.scala | 135 ++++++++++++++++++++- 9 files changed, 278 insertions(+), 17 deletions(-) diff --git a/connector/protobuf/pom.xml b/connector/protobuf/pom.xml index e5e1fe342a1..6feef54ce71 100644 --- a/connector/protobuf/pom.xml +++ b/connector/protobuf/pom.xml @@ -82,6 +82,12 @@ <version>${protobuf.version}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + <version>${protobuf.version}</version> + <scope>compile</scope> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> @@ -145,6 +151,7 @@ <include>src/test/resources/protobuf</include> </inputDirectories> <addSources>test</addSources> + <includeStdTypes>true</includeStdTypes> </configuration> </execution> </executions> diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala index cf6114f2f6c..ae2bd9fc4a7 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala @@ -20,8 +20,8 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import com.google.protobuf.DynamicMessage +import com.google.protobuf.TypeRegistry -import org.apache.spark.sql.catalyst.NoopFilters import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} @@ -64,12 +64,21 @@ private[protobuf] case class ProtobufDataToCatalyst( @transient private lazy val fieldsNumbers = messageDescriptor.getFields.asScala.map(f => f.getNumber).toSet - @transient private lazy val deserializer = + @transient private lazy val deserializer = { + val typeRegistry = descFilePath match { + case Some(path) if protobufOptions.convertAnyFieldsToJson => + ProtobufUtils.buildTypeRegistry(path) // This loads all the messages in the file. + case None if protobufOptions.convertAnyFieldsToJson => + ProtobufUtils.buildTypeRegistry(messageDescriptor) // Loads only connected messages. + case _ => TypeRegistry.getEmptyTypeRegistry // Default. Json conversion is not enabled. + } new ProtobufDeserializer( messageDescriptor, dataType, - new NoopFilters, - protobufOptions.emitDefaultValues) + typeRegistry = typeRegistry, + emitDefaultValues = protobufOptions.emitDefaultValues + ) + } @transient private var result: DynamicMessage = _ diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala index c3bc46c62ea..d1464163568 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala @@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit import com.google.protobuf.{ByteString, DynamicMessage, Message} import com.google.protobuf.Descriptors._ import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ +import com.google.protobuf.TypeRegistry +import com.google.protobuf.util.JsonFormat import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} @@ -36,11 +38,14 @@ import org.apache.spark.unsafe.types.UTF8String private[sql] class ProtobufDeserializer( rootDescriptor: Descriptor, rootCatalystType: DataType, - filters: StructFilters, + filters: StructFilters = new NoopFilters, + typeRegistry: TypeRegistry = TypeRegistry.getEmptyTypeRegistry, emitDefaultValues: Boolean = false) { def this(rootDescriptor: Descriptor, rootCatalystType: DataType) = { - this(rootDescriptor, rootCatalystType, new NoopFilters, false) + this( + rootDescriptor, rootCatalystType, new NoopFilters, TypeRegistry.getEmptyTypeRegistry, false + ) } private val converter: Any => Option[InternalRow] = @@ -71,6 +76,14 @@ private[sql] class ProtobufDeserializer( def deserialize(data: Message): Option[InternalRow] = converter(data) + // JsonFormatter used to convert Any fields (if the option is enabled). + // This keeps original field names and does not include any extra whitespace in JSON. + // If the runtime type for Any field is not found in the registry, it throws an exception. + private val jsonPrinter = JsonFormat.printer + .omittingInsignificantWhitespace() + .preservingProtoFieldNames() + .usingTypeRegistry(typeRegistry) + private def newArrayWriter( protoField: FieldDescriptor, protoPath: Seq[String], @@ -225,6 +238,13 @@ private[sql] class ProtobufDeserializer( val micros = DateTimeUtils.millisToMicros(seconds * 1000) updater.setLong(ordinal, micros + TimeUnit.NANOSECONDS.toMicros(nanoSeconds)) + case (MESSAGE, StringType) + if protoType.getMessageType.getFullName == "google.protobuf.Any" => + (updater, ordinal, value) => + // Convert 'Any' protobuf message to JSON string. + val jsonStr = jsonPrinter.print(value.asInstanceOf[DynamicMessage]) + updater.set(ordinal, UTF8String.fromString(jsonStr)) + case (MESSAGE, st: StructType) => val writeRecord = getRecordWriter( protoType.getMessageType, diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala index 0af8c50dc5a..04436346dec 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala @@ -32,6 +32,8 @@ private[sql] class ProtobufOptions( extends FileSourceOptions(parameters) with Logging { + import ProtobufOptions._ + def this(parameters: Map[String, String], conf: Configuration) = { this(CaseInsensitiveMap(parameters), conf) } @@ -39,15 +41,72 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) - // Setting the `recursive.fields.max.depth` to 1 allows it to be recurse once, - // and 2 allows it to be recursed twice and so on. A value of `recursive.fields.max.depth` - // greater than 10 is not permitted. If it is not specified, the default value is -1; - // A value of 0 or below disallows any recursive fields. If a protobuf - // record has more depth than the allowed value for recursive fields, it will be truncated - // and corresponding fields are ignored (dropped). + /** + * Adds support for recursive fields. If this option is is not specified, recursive fields are + * not permitted. Setting it to 0 drops the recursive fields, 1 allows it to be recursed once, + * and 2 allows it to be recursed twice and so on, up to 10. Values larger than 10 are not + * allowed in order avoid inadvertently creating very large schemas. If a Protobuf message + * has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. + * + * Examples. Consider a Protobuf with a recursive field: + * `message Person { string name = 1; Person friend = 2; }` + * The following lists the schema with different values for this setting. + * 1: `struct<name: string>` + * 2: `struct<name string, friend: struct<name: string>>` + * 3: `struct<name string, friend: struct<name string, friend: struct<name: string>>>` + * and so on. + */ val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt - // Whether to render fields with zero values when deserializing Protobufs to a Spark struct. + /** + * This option ("convert.any.fields.to.json") enables converting Protobuf 'Any' fields to JSON. + * At runtime, such 'Any' fields can contain arbitrary Protobuf messages as binary data. + * + * By default when this option is not enabled, such field behaves like normal Protobuf message + * with two fields (`STRUCT<type_url: STRING, value: BINARY>`). The binary `value` field is not + * interpreted. The binary data might not be convenient in practice to work with. + * + * One option is to deserialize it into actual Protobuf message and convert it to Spark STRUCT. + * But this is not feasible since the schema for `from_protobuf()` is needed at query compile + * time and can not change at runtime. As a result, this option is not feasible. + * + * Another option is parse the binary and deserialize the Protobuf message into JSON string. + * This this lot more readable than the binary data. This configuration option enables + * converting Any fields to JSON. The example blow clarifies further. + * + * Consider two Protobuf types defined as follows: + * message ProtoWithAny { + * string event_name = 1; + * google.protobuf.Any details = 2; + * } + * + * message Person { + * string name = 1; + * int32 id = 2; + * } + * + * With this option enabled, schema for `from_protobuf("col", messageName = "ProtoWithAny")` + * would be : `STRUCT<event_name: STRING, details: STRING>`. + * At run time, if `details` field contains `Person` Protobuf message, the returned value looks + * like this: + * ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}') + * + * Requirements: + * - The definitions for all the possible Protobuf types that are used in Any fields should be + * available in the Protobuf descriptor file passed to `from_protobuf()`. If any Protobuf + * is not found, it will result in error for that record. + * - This feature is supported with Java classes as well. But only the Protobuf types defined + * in the same `proto` file as the primary Java class might be visible. + * E.g. if `ProtoWithAny` and `Person` in above example are in different proto files, + * definition for `Person` may not be found. + * + * This feature should be enabled carefully. JSON conversion and processing are inefficient. + * In addition schema safety is also reduced making downstream processing error prone. + */ + val convertAnyFieldsToJson: Boolean = + parameters.getOrElse(CONVERT_ANY_FIELDS_TO_JSON_CONFIG, "false").toBoolean + + // Whether to render fields with zero values when deserializing Protobuf to a Spark struct. // When a field is empty in the serialized Protobuf, this library will deserialize them as // null by default. However, this flag can control whether to render the type-specific zero value. // This operates similarly to `includingDefaultValues` in protobuf-java-util's JsonFormat, or @@ -65,15 +124,16 @@ private[sql] class ProtobufOptions( // ``` // // And we have a proto constructed like: - // `Person(age=0, middle_name="") + // `Person(age=0, middle_name="")` // - // The result after calling from_protobuf without this flag set would be: + // The result after calling from_protobuf() without this flag set would be: // `{"name": null, "age": null, "middle_name": "", "salary": null}` // (age is null because zero-value singular fields are not in the wire format in proto3). // // // With this flag it would be: // `{"name": "", "age": 0, "middle_name": "", "salary": null}` + // ("salary" remains null, since it is declared explicitly as an optional field) // // Ref: https://protobuf.dev/programming-guides/proto3/#default for information about // type-specific defaults. @@ -90,4 +150,6 @@ private[sql] object ProtobufOptions { .getOrElse(new Configuration()) new ProtobufOptions(CaseInsensitiveMap(parameters), hadoopConf) } + + val CONVERT_ANY_FIELDS_TO_JSON_CONFIG = "convert.any.fields.to.json" } diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index bf207d6068f..5688a483da0 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException, Message} import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet} import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} +import com.google.protobuf.TypeRegistry import org.apache.spark.internal.Logging import org.apache.spark.sql.errors.QueryCompilationErrors @@ -283,4 +284,20 @@ private[sql] object ProtobufUtils extends Logging { case Seq() => "top-level record" case n => s"field '${n.mkString(".")}'" } + + /** Builds [[TypeRegistry]] with all the messages found in the descriptor file. */ + private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry = { + val registryBuilder = TypeRegistry.newBuilder() + for (fileDesc <- parseFileDescriptorSet(descFilePath)) { + registryBuilder.add(fileDesc.getMessageTypes) + } + registryBuilder.build() + } + + /** Builds [[TypeRegistry]] with the descriptor and the others from the same proto file. */ + private [protobuf] def buildTypeRegistry(descriptor: Descriptor): TypeRegistry = { + TypeRegistry.newBuilder() + .add(descriptor) // This adds any other descriptors in the associated proto file. + .build() + } } diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala index e277f2999e4..b2ed7d64f36 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala @@ -88,6 +88,9 @@ object SchemaConverters extends Logging { fd.getMessageType.getFields.get(0).getName.equals("seconds") && fd.getMessageType.getFields.get(1).getName.equals("nanos")) => Some(TimestampType) + case MESSAGE if protobufOptions.convertAnyFieldsToJson && + fd.getMessageType.getFullName == "google.protobuf.Any" => + Some(StringType) // Any protobuf will be parsed and converted to json string. case MESSAGE if fd.isRepeated && fd.getMessageType.getOptions.hasMapEntry => var keyType: Option[DataType] = None var valueType: Option[DataType] = None diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc index 80b78cd75f6..215dc05707f 100644 Binary files a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc and b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc differ diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto index 2e9add4987c..a9d8d626481 100644 --- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto +++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto @@ -26,6 +26,7 @@ package org.apache.spark.sql.protobuf.protos; import "timestamp.proto"; import "duration.proto"; import "basicmessage.proto"; +import "google/protobuf/any.proto"; option java_outer_classname = "SimpleMessageProtos"; @@ -284,6 +285,17 @@ message Status { Status status = 3; } +// Messages for testing Any fields +message ProtoWithAny { + string event_name = 1; + google.protobuf.Any details = 3; +} + +message ProtoWithAnyArray { // A container of Any fields + string description = 1; + repeated google.protobuf.Any items = 2; +} + // Contains a representative sample of all types, using the groupings defined // here: https://protobuf.dev/programming-guides/field_presence/#presence-in-proto3-apis message Proto3AllTypes { diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 3105d9dc8b5..c47cd882af9 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -21,13 +21,16 @@ import java.time.Duration import scala.collection.JavaConverters._ -import com.google.protobuf.{ByteString, DynamicMessage} +import com.google.protobuf.{Any => AnyProto, ByteString, DynamicMessage} +import org.json4s.StringInput +import org.json4s.jackson.JsonMethods import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row} import org.apache.spark.sql.functions.{lit, struct, typedLit} import org.apache.spark.sql.protobuf.protos.Proto2Messages.Proto2AllTypes import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos._ import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated.NestedEnum +import org.apache.spark.sql.protobuf.utils.ProtobufOptions import org.apache.spark.sql.protobuf.utils.ProtobufUtils import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -40,7 +43,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot val testFileDesc = testFile("functions_suite.desc", "protobuf/functions_suite.desc") private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$" - val proto2FileDesc = testFile("proto2_messages.desc", "protobuf/proto_messages.desc") + val proto2FileDesc = testFile("proto2_messages.desc", "protobuf/proto2_messages.desc") private val proto2JavaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.Proto2Messages$" private def emptyBinaryDF = Seq(Array[Byte]()).toDF("binary") @@ -1120,6 +1123,134 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } } + test("Converting Any fields to JSON") { + // Verifies schema and deserialization when 'convert.any.fields.to.json' is set. + checkWithFileAndClassName("ProtoWithAny") { + case (name, descFilePathOpt) => + + // proto: 'message { string event_name = 1; google.protobuf.Any details = 2 }' + + val simpleProto = SimpleMessage // Json: {"id":10,"string_value":"galaxy"} + .newBuilder() + .setId(10) + .setStringValue("galaxy") + .build() + + val protoWithAnyBytes = ProtoWithAny + .newBuilder() + .setEventName("click") + .setDetails(AnyProto.pack(simpleProto)) + .build() + .toByteArray + + val inputDF = Seq(protoWithAnyBytes).toDF("binary") + + // Check schema with default options where Any field not converted to json. + val df = inputDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt).as("proto") + ) + // Default behavior: 'details' is a struct with 'type_url' and binary 'value'. + assert(df.schema.toDDL == + "proto STRUCT<event_name: STRING, details: STRUCT<type_url: STRING, value: BINARY>>" + ) + + // Enable option to convert to json. + val options = Map(ProtobufOptions.CONVERT_ANY_FIELDS_TO_JSON_CONFIG -> "true") + val dfJson = inputDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("proto") + ) + // Now 'details' should be a string. + assert(dfJson.schema.toDDL == "proto STRUCT<event_name: STRING, details: STRING>") + + // Verify Json value for details + + val row = dfJson.collect()(0).getStruct(0) + + val expectedJson = """{"@type":""" + // The json includes "@type" field as well. + """"type.googleapis.com/org.apache.spark.sql.protobuf.protos.SimpleMessage",""" + + """"id":"10","string_value":"galaxy"}""" + + assert(row.getString(0) == "click") + assert(row.getString(1) == expectedJson) + } + } + + test("Converting nested Any fields to JSON") { + // This is a more involved version of the previous test with nested Any field inside an array. + + // Takes json string and return a json with all the extra whitespace removed. + def compactJson(json: String): String = { + val jsonValue = JsonMethods.parse(StringInput(json)) + JsonMethods.compact(jsonValue) + } + + checkWithFileAndClassName("ProtoWithAnyArray") { case (name, descFilePathOpt) => + + // proto: message { string description = 1; repeated google.protobuf.Any items = 2; + + // Use two different types of protos for 'items'. One with an Any field, and one without. + + val simpleProto = SimpleMessage.newBuilder() // Json: {"id":10,"string_value":"galaxy"} + .setId(10) + .setStringValue("galaxy") + .build() + + val protoWithAny = ProtoWithAny.newBuilder() + .setEventName("click") + .setDetails(AnyProto.pack(simpleProto)) + .build() + + val protoWithAnyArrayBytes = ProtoWithAnyArray.newBuilder() + .setDescription("nested any demo") + .addItems(AnyProto.pack(simpleProto)) // A simple proto + .addItems(AnyProto.pack(protoWithAny)) // A proto with any field inside it. + .build() + .toByteArray + + val inputDF = Seq(protoWithAnyArrayBytes).toDF("binary") + + // check default schema + val df = inputDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt).as("proto") + ) + // Default behavior: 'details' is a struct with 'type_url' and binary 'value'. + assert(df.schema.toDDL == "proto STRUCT<description: STRING, " + + "items: ARRAY<STRUCT<type_url: STRING, value: BINARY>>>" + ) + + // String for items with 'convert.to.json' option enabled. + val options = Map(ProtobufOptions.CONVERT_ANY_FIELDS_TO_JSON_CONFIG -> "true") + val dfJson = inputDF.select(from_protobuf_wrapper( + $"binary", name, descFilePathOpt, options).as("proto") + ) + // Now 'details' should be a string. + assert(dfJson.schema.toDDL == "proto STRUCT<description: STRING, items: ARRAY<STRING>>") + + val row = dfJson.collect()(0).getStruct(0) + val items = row.getList[String](1) + + assert(row.getString(0) == "nested any demo") + assert(items.get(0) == compactJson( + """ + | { + | "@type":"type.googleapis.com/org.apache.spark.sql.protobuf.protos.SimpleMessage", + | "id":"10", + | "string_value":"galaxy" + | }""".stripMargin)) + assert(items.get(1) == compactJson( + """ + | { + | "@type":"type.googleapis.com/org.apache.spark.sql.protobuf.protos.ProtoWithAny", + | "event_name":"click", + | "details": { + | "@type":"type.googleapis.com/org.apache.spark.sql.protobuf.protos.SimpleMessage", + | "id":"10", + | "string_value":"galaxy" + | } + | }""".stripMargin)) + } + } + test("test explicitly set zero values - proto3") { // All fields explicitly zero. Message, map, repeated, and oneof fields // are left unset, as null is their zero value. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org