rangadi commented on code in PR #44643:
URL: https://github.com/apache/spark/pull/44643#discussion_r1468253272
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -207,6 +207,23 @@ private[sql] class ProtobufOptions(
// nil => nil, Int32Value(0) => 0, Int32Value(100) => 100.
val unwrapWellKnownTypes: Boolean =
parameters.getOrElse("unwrap.primitive.wrapper.types",
false.toString).toBoolean
+
+ // Since Spark doesn't allow writing empty StructType, empty proto message
type will be
+ // dropped by default. Setting this option to true will insert a dummy
column to empty proto
+ // message so that the empty message will be retained.
+ // For example, an empty message is used as field in another message:
+ //
+ // ```
+ // message A {}
+ // Message B {A a = 1, string name = 2}
+ // ```
+ //
+ // By default, in the spark schema field a will be dropped, which result in
schema
+ // b struct<name: string>
+ // If retain.empty.message.types=true, field a will be retained by inserting
a dummy column.
+ // b struct<name: string, a struct<__dummy_field_in_empty_struct: string>>
Review Comment:
`a` would be first and `name` would be second right?
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -230,4 +235,11 @@ object SchemaConverters extends Logging {
case dt => StructField(fd.getName, dt, nullable = !fd.isRequired)
}
}
+
+ // Insert a dummy column to retain the empty message because
+ // spark doesn't allow empty struct type.
+ private def convertEmptyProtoToStructWithDummyField(desc: String):
StructType = {
+ log.info(s"Keep $desc which is empty struct by inserting a dummy field.")
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil)
Review Comment:
minor: Shall we add "nullable = true" though that is the default?
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1136,7 +1208,8 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
val df = emptyBinaryDF.select(
from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
)
- assert(df.schema == structFromDDL("empty_proto struct<>"))
+ assert(df.schema ==
+ structFromDDL("empty_proto struct<>"))
Review Comment:
I thought we would not have empty struct after this PR.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1221,32 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
+ test("Retain empty recursive proto fields when
retain.empty.message.types=true") {
+ // This verifies that a empty proto like 'message A { A a = 1}' can be
retained by
+ // inserting a dummy field.
+
+ val emptyProtoSchema =
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) ::
Nil)
+ val expectedSchema = StructType(
Review Comment:
Could you add comment to show this schema? It is hard to see.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1221,32 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
+ test("Retain empty recursive proto fields when
retain.empty.message.types=true") {
+ // This verifies that a empty proto like 'message A { A a = 1}' can be
retained by
+ // inserting a dummy field.
+
+ val emptyProtoSchema =
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) ::
Nil)
+ val expectedSchema = StructType(
+ StructField("empty_proto",
+ StructType(
+ StructField("recursive_field", emptyProtoSchema) ::
+ StructField("recursive_array", ArrayType(emptyProtoSchema,
containsNull = false)
+ ) :: Nil
+ )
+ ) :: Nil
+ )
+
+ val options = Map("recursive.fields.max.depth" -> "2",
"retain.empty.message.types" -> "true")
Review Comment:
Could you change it to 3 or add a case with 3? I just want to see how the
schema changes. Please add comment to make it easier to see the schema.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
Review Comment:
Is `EmptyRecursiveProto` used in this test?
- If not, lets remove this recursive setting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]