This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3f7994217d5a [SPARK-46736][PROTOBUF] retain empty message field in protobuf connector 3f7994217d5a is described below commit 3f7994217d5a8d2816165459c1ce10d9b31bc7fd Author: Chaoqin Li <chaoqin...@databricks.com> AuthorDate: Tue Jan 30 11:02:35 2024 +0900 [SPARK-46736][PROTOBUF] retain empty message field in protobuf connector ### What changes were proposed in this pull request? Since Spark doesn't allow empty StructType, empty proto message type as field will be dropped by default. introduce an option to allow retaining an empty message field by inserting a dummy column. ### Why are the changes needed? In protobuf, it is common to have empty message type without any field as a place holder, in some case people may not want to drop these empty message field. ### Does this PR introduce _any_ user-facing change? Yes. The default behavior is still dropping an empty message field. The new option will enable customer to keep the empty message field though they will observe a dummy column. ### How was this patch tested? Unit test and integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44643 from chaoqin-li1123/empty_proto. Authored-by: Chaoqin Li <chaoqin...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/protobuf/utils/ProtobufOptions.scala | 17 +++ .../sql/protobuf/utils/SchemaConverters.scala | 34 ++++-- .../test/resources/protobuf/functions_suite.proto | 9 ++ .../sql/protobuf/ProtobufFunctionsSuite.scala | 123 ++++++++++++++++++++- 4 files changed, 171 insertions(+), 12 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala index 5f8c42df365a..6644bce98293 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala @@ -207,6 +207,23 @@ private[sql] class ProtobufOptions( // nil => nil, Int32Value(0) => 0, Int32Value(100) => 100. val unwrapWellKnownTypes: Boolean = parameters.getOrElse("unwrap.primitive.wrapper.types", false.toString).toBoolean + + // Since Spark doesn't allow writing empty StructType, empty proto message type will be + // dropped by default. Setting this option to true will insert a dummy column to empty proto + // message so that the empty message will be retained. + // For example, an empty message is used as field in another message: + // + // ``` + // message A {} + // message B {A a = 1, string name = 2} + // ``` + // + // By default, in the spark schema field a will be dropped, which result in schema + // b struct<name: string> + // If retain.empty.message.types=true, field a will be retained by inserting a dummy column. + // b struct<a struct<__dummy_field_in_empty_struct: string>, name: string> + val retainEmptyMessage: Boolean = + parameters.getOrElse("retain.empty.message.types", false.toString).toBoolean } private[sql] object ProtobufOptions { diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala index b35aa153aaa1..feb5aed03451 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala @@ -51,12 +51,13 @@ object SchemaConverters extends Logging { def toSqlTypeHelper( descriptor: Descriptor, protobufOptions: ProtobufOptions): SchemaType = { - SchemaType( - StructType(descriptor.getFields.asScala.flatMap( - structFieldFor(_, - Map(descriptor.getFullName -> 1), - protobufOptions: ProtobufOptions)).toArray), - nullable = true) + val fields = descriptor.getFields.asScala.flatMap( + structFieldFor(_, + Map(descriptor.getFullName -> 1), + protobufOptions: ProtobufOptions)).toSeq + if (fields.isEmpty && protobufOptions.retainEmptyMessage) { + SchemaType(convertEmptyProtoToStructWithDummyField(descriptor.getFullName), nullable = true) + } else SchemaType(StructType(fields), nullable = true) } // existingRecordNames: Map[String, Int] used to track the depth of recursive fields and to @@ -212,11 +213,15 @@ object SchemaConverters extends Logging { ).toSeq fields match { case Nil => - log.info( - s"Dropping ${fd.getFullName} as it does not have any fields left " + - "likely due to recursive depth limit." - ) - None + if (protobufOptions.retainEmptyMessage) { + Some(convertEmptyProtoToStructWithDummyField(fd.getFullName)) + } else { + log.info( + s"Dropping ${fd.getFullName} as it does not have any fields left " + + "likely due to recursive depth limit." + ) + None + } case fds => Some(StructType(fds)) } } @@ -230,4 +235,11 @@ object SchemaConverters extends Logging { case dt => StructField(fd.getName, dt, nullable = !fd.isRequired) } } + + // Insert a dummy column to retain the empty message because + // spark doesn't allow empty struct type. + private def convertEmptyProtoToStructWithDummyField(desc: String): StructType = { + log.info(s"Keep $desc which is empty struct by inserting a dummy field.") + StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil) + } } diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto index a643e91158eb..4cae7f9abf28 100644 --- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto +++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto @@ -269,6 +269,15 @@ message EventRecursiveB { OneOfEventWithRecursion recursiveOneOffInB = 3; } +message EmptyProto { + // This is an empty proto +} + +message EmptyProtoWrapper { + string name = 1; + EmptyProto empty_proto = 2; +} + message EmptyRecursiveProto { // This is a recursive proto with no fields. Used to test edge. Catalyst schema for this // should be "nothing" (i.e. completely dropped) irrespective of recursive limit. diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 5e9e737151fe..fb8a68f1812b 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } } - test("Corner case: empty recursive proto fields should be dropped") { + test("Retain empty proto fields when retain.empty.message.types=true") { + // When retain.empty.message.types=true, empty proto like 'message A {}' can be retained as + // a field by inserting a dummy column as sub column. + val options = Map("retain.empty.message.types" -> "true") + + // EmptyProto at the top level. It will be an empty struct. + checkWithFileAndClassName("EmptyProto") { + case (name, descFilePathOpt) => + val df = emptyBinaryDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("empty_proto") + ) + // Top level empty message is retained by adding dummy column to the schema. + assert(df.schema == + structFromDDL("empty_proto struct<__dummy_field_in_empty_struct: string>")) + } + + // Inner level empty message is retained by adding dummy column to the schema. + checkWithFileAndClassName("EmptyProtoWrapper") { + case (name, descFilePathOpt) => + val df = emptyBinaryDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("wrapper") + ) + // Nested empty message is retained by adding dummy column to the schema. + assert(df.schema == structFromDDL("wrapper struct" + + "<name: string, empty_proto struct<__dummy_field_in_empty_struct: string>>")) + } + } + + test("Write empty proto to parquet when retain.empty.message.types=true") { + // When retain.empty.message.types=true, empty proto like 'message A {}' can be retained + // as a field by inserting a dummy column as sub column, such schema can be written to parquet. + val options = Map("retain.empty.message.types" -> "true") + withTempDir { file => + val binaryDF = Seq( + EmptyProtoWrapper.newBuilder.setName("my_name").build().toByteArray) + .toDF("binary") + checkWithFileAndClassName("EmptyProtoWrapper") { + case (name, descFilePathOpt) => + val df = binaryDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("wrapper") + ) + df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath) + } + val resultDF = spark.read.format("parquet").load(file.getAbsolutePath) + assert(resultDF.schema == structFromDDL("wrapper struct" + + "<name: string, empty_proto struct<__dummy_field_in_empty_struct: string>>" + )) + // The dummy column of empty proto should have null value. + checkAnswer(resultDF, Seq(Row(Row("my_name", null)))) + } + + // When top level message is empty, write to parquet. + withTempDir { file => + val binaryDF = Seq( + EmptyProto.newBuilder.build().toByteArray) + .toDF("binary") + checkWithFileAndClassName("EmptyProto") { + case (name, descFilePathOpt) => + val df = binaryDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("empty_proto") + ) + df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath) + } + val resultDF = spark.read.format("parquet").load(file.getAbsolutePath) + assert(resultDF.schema == + structFromDDL("empty_proto struct<__dummy_field_in_empty_struct: string>")) + // The dummy column of empty proto should have null value. + checkAnswer(resultDF, Seq(Row(Row(null)))) + } + } + + test("Corner case: empty recursive proto fields should be dropped by default") { // This verifies that a empty proto like 'message A { A a = 1}' are completely dropped // irrespective of max depth setting. @@ -1150,6 +1221,56 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } } + test("Retain empty recursive proto fields when retain.empty.message.types=true") { + // This verifies that a empty proto like 'message A { A a = 1}' can be retained by + // inserting a dummy field. + + val structWithDummyColumn = + StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil) + val structWithRecursiveDepthEquals2 = StructType( + StructField("recursive_field", structWithDummyColumn) + :: StructField("recursive_array", ArrayType(structWithDummyColumn, containsNull = false)) + :: Nil) + /* + The code below construct the expected schema with recursive depth set to 3. + Note: If recursive depth change, the resulting schema of empty recursive proto will change. + root + |-- empty_proto: struct (nullable = true) + | |-- recursive_field: struct (nullable = true) + | | |-- recursive_field: struct (nullable = true) + | | | |-- __dummy_field_in_empty_struct: string (nullable = true) + | | |-- recursive_array: array (nullable = true) + | | | |-- element: struct (containsNull = false) + | | | | |-- __dummy_field_in_empty_struct: string (nullable = true) + | |-- recursive_array: array (nullable = true) + | | |-- element: struct (containsNull = false) + | | | |-- recursive_field: struct (nullable = true) + | | | | |-- __dummy_field_in_empty_struct: string (nullable = true) + | | | |-- recursive_array: array (nullable = true) + | | | | |-- element: struct (containsNull = false) + | | | | | |-- __dummy_field_in_empty_struct: string (nullable = true) + */ + val structWithRecursiveDepthEquals3 = StructType( + StructField("empty_proto", + StructType( + StructField("recursive_field", structWithRecursiveDepthEquals2) :: + StructField("recursive_array", + ArrayType(structWithRecursiveDepthEquals2, containsNull = false) + ) :: Nil + ) + ) :: Nil + ) + + val options = Map("recursive.fields.max.depth" -> "3", "retain.empty.message.types" -> "true") + checkWithFileAndClassName("EmptyRecursiveProto") { + case (name, descFilePathOpt) => + val df = emptyBinaryDF.select( + from_protobuf_wrapper($"binary", name, descFilePathOpt, options).as("empty_proto") + ) + assert(df.schema == structWithRecursiveDepthEquals3) + } + } + test("Converting Any fields to JSON") { // Verifies schema and deserialization when 'convert.any.fields.to.json' is set. checkWithFileAndClassName("ProtoWithAny") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org