charlesy6 commented on code in PR #44643:
URL: https://github.com/apache/spark/pull/44643#discussion_r1458381280
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
+ )
+ // Top level empty message is retained without adding dummy column to
the schema.
+ assert(df.schema == structFromDDL("empty_proto struct<>"))
+ }
+
+ // EmptyProto at inner level, because empty struct type is not allowed in
Spark.,
+ // a dummy column is inserted to retain the empty message.
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+ // Nested empty message is retained by adding dummy column to the
schema.
+ assert(df.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"))
+ }
+ }
+
+ test("Write empty proto to parquet when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained
+ // as a field by inserting a dummy column as sub column, such schema can
be written to parquet.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+ withTempDir { file =>
+ val binaryDF = Seq(
+
EmptyRecursiveProtoWrapper.newBuilder.setName("my_name").build().toByteArray)
+ .toDF("binary")
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = binaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+
df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath)
+ }
+ val resultDF = spark.read.format("parquet").load(file.getAbsolutePath)
+ assert(resultDF.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"
+ ))
+ // The dummy column of empty proto should have null value.
+ checkAnswer(resultDF, Seq(Row(Row("my_name", null))))
+ }
+
+ // Top level message can't be empty, otherwise AnalysisException will be
thrown.
Review Comment:
> Test added.
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]