rangadi commented on code in PR #44643:
URL: https://github.com/apache/spark/pull/44643#discussion_r1457920251
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -212,11 +212,23 @@ object SchemaConverters extends Logging {
).toSeq
fields match {
case Nil =>
- log.info(
- s"Dropping ${fd.getFullName} as it does not have any fields
left " +
- "likely due to recursive depth limit."
- )
- None
+ if (protobufOptions.retainEmptyMessage) {
+ // Insert a dummy column to retain the empty message because
+ // spark doesn't allow empty struct type.
+ log.info(
+ s"Keep ${fd.getFullName} which is empty struct by inserting
a dummy field" +
+ s" because retain.empty.message=true"
Review Comment:
`because ...` could be removed.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -207,6 +207,21 @@ private[sql] class ProtobufOptions(
// nil => nil, Int32Value(0) => 0, Int32Value(100) => 100.
val unwrapWellKnownTypes: Boolean =
parameters.getOrElse("unwrap.primitive.wrapper.types",
false.toString).toBoolean
+
+ // Since Spark doesn't allow writing empty StructType, empty proto message
type will be
Review Comment:
Could you format this better?
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1221,32 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
+ test("Retain empty recursive proto fields when
retain.empty.message.types=true") {
+ // This verifies that a empty proto like 'message A { A a = 1}' can be
retained by
+ // inserting a dummy field.
+
+ val emptyProtoSchema =
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) ::
Nil)
+ val expectedSchema = StructType(
Review Comment:
Could you add a comment here showing expected schema? It is hard to see
here.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
Review Comment:
Could add the a test with recursive proto version of this test?
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
+ )
+ // Top level empty message is retained without adding dummy column to
the schema.
+ assert(df.schema == structFromDDL("empty_proto struct<>"))
+ }
+
+ // EmptyProto at inner level, because empty struct type is not allowed in
Spark.,
+ // a dummy column is inserted to retain the empty message.
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+ // Nested empty message is retained by adding dummy column to the
schema.
+ assert(df.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"))
+ }
+ }
+
+ test("Write empty proto to parquet when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained
+ // as a field by inserting a dummy column as sub column, such schema can
be written to parquet.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+ withTempDir { file =>
+ val binaryDF = Seq(
+
EmptyRecursiveProtoWrapper.newBuilder.setName("my_name").build().toByteArray)
+ .toDF("binary")
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = binaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+
df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath)
+ }
+ val resultDF = spark.read.format("parquet").load(file.getAbsolutePath)
+ assert(resultDF.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"
Review Comment:
Btw, since `"recursive.fields.max.depth"` is 4, there should be 4 levels of
struct here, right?
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
Review Comment:
Remove `"recursive.fields.max.depth" -> "4"` ? This test does not use
recursive protobufs.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1221,32 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
+ test("Retain empty recursive proto fields when
retain.empty.message.types=true") {
+ // This verifies that a empty proto like 'message A { A a = 1}' can be
retained by
+ // inserting a dummy field.
+
+ val emptyProtoSchema =
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) ::
Nil)
+ val expectedSchema = StructType(
+ StructField("empty_proto",
+ StructType(
+ StructField("recursive_field", emptyProtoSchema) ::
+ StructField("recursive_array", ArrayType(emptyProtoSchema,
containsNull = false)
+ ) :: Nil
+ )
+ ) :: Nil
+ )
+
+ val options = Map("recursive.fields.max.depth" -> "2",
"retain.empty.message.types" -> "true")
Review Comment:
Will the schema change if max depth is higher, say 3?
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
+ )
+ // Top level empty message is retained without adding dummy column to
the schema.
+ assert(df.schema == structFromDDL("empty_proto struct<>"))
+ }
+
+ // EmptyProto at inner level, because empty struct type is not allowed in
Spark.,
+ // a dummy column is inserted to retain the empty message.
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+ // Nested empty message is retained by adding dummy column to the
schema.
+ assert(df.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"))
+ }
+ }
+
+ test("Write empty proto to parquet when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained
+ // as a field by inserting a dummy column as sub column, such schema can
be written to parquet.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+ withTempDir { file =>
+ val binaryDF = Seq(
+
EmptyRecursiveProtoWrapper.newBuilder.setName("my_name").build().toByteArray)
+ .toDF("binary")
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = binaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+
df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath)
+ }
+ val resultDF = spark.read.format("parquet").load(file.getAbsolutePath)
+ assert(resultDF.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"
+ ))
+ // The dummy column of empty proto should have null value.
+ checkAnswer(resultDF, Seq(Row(Row("my_name", null))))
+ }
+
+ // Top level message can't be empty, otherwise AnalysisException will be
thrown.
Review Comment:
Can you add a case where top level struct is empty and we try to store
(using `EmptyProto`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]