rangadi commented on code in PR #44643:
URL: https://github.com/apache/spark/pull/44643#discussion_r1470375085
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,79 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
+ )
+ // Top level empty message is retained without adding dummy column to
the schema.
Review Comment:
Fix this comment. Top level also has dummy column.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,79 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
+ )
+ // Top level empty message is retained without adding dummy column to
the schema.
+ assert(df.schema ==
+ structFromDDL("empty_proto struct<__dummy_field_in_empty_struct:
string>"))
+ }
+
+ // EmptyProto at inner level, because empty struct type is not allowed in
Spark.,
Review Comment:
Fix this comment.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##########
@@ -207,6 +207,23 @@ private[sql] class ProtobufOptions(
// nil => nil, Int32Value(0) => 0, Int32Value(100) => 100.
val unwrapWellKnownTypes: Boolean =
parameters.getOrElse("unwrap.primitive.wrapper.types",
false.toString).toBoolean
+
+ // Since Spark doesn't allow writing empty StructType, empty proto message
type will be
+ // dropped by default. Setting this option to true will insert a dummy
column to empty proto
+ // message so that the empty message will be retained.
+ // For example, an empty message is used as field in another message:
+ //
+ // ```
+ // message A {}
+ // Message B {A a = 1, string name = 2}
Review Comment:
message with small `m`.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -230,4 +235,11 @@ object SchemaConverters extends Logging {
case dt => StructField(fd.getName, dt, nullable = !fd.isRequired)
}
}
+
+ // Insert a dummy column to retain the empty message because
+ // spark doesn't allow empty struct type.
+ private def convertEmptyProtoToStructWithDummyField(desc: String):
StructType = {
+ log.info(s"Keep $desc which is empty struct by inserting a dummy field.")
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil)
Review Comment:
I see. We have set that in other areas. You can resolve this comment.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1136,7 +1208,8 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
val df = emptyBinaryDF.select(
from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
)
- assert(df.schema == structFromDDL("empty_proto struct<>"))
+ assert(df.schema ==
+ structFromDDL("empty_proto struct<>"))
Review Comment:
Could you remove this diff. Looks like spurious change.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1150,6 +1223,56 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
+ test("Retain empty recursive proto fields when
retain.empty.message.types=true") {
+ // This verifies that a empty proto like 'message A { A a = 1}' can be
retained by
+ // inserting a dummy field.
+
+ val structWithDummyColumn =
+ StructType(StructField("__dummy_field_in_empty_struct", StringType) ::
Nil)
+ val structWithRecursiveDepthEquals2 = StructType(
+ StructField("recursive_field", structWithDummyColumn)
+ :: StructField("recursive_array", ArrayType(structWithDummyColumn,
containsNull = false))
+ :: Nil)
+ /*
+ The code below construct the expected schema with recursive depth set to
3.
Review Comment:
Thanks for adding this comment.
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1124,7 +1124,79 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
}
}
- test("Corner case: empty recursive proto fields should be dropped") {
+ test("Retain empty proto fields when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained as
+ // a field by inserting a dummy column as sub column.
+ val options = Map("retain.empty.message.types" -> "true")
+
+ // EmptyProto at the top level. It will be an empty struct.
+ checkWithFileAndClassName("EmptyProto") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("empty_proto")
+ )
+ // Top level empty message is retained without adding dummy column to
the schema.
+ assert(df.schema ==
+ structFromDDL("empty_proto struct<__dummy_field_in_empty_struct:
string>"))
+ }
+
+ // EmptyProto at inner level, because empty struct type is not allowed in
Spark.,
+ // a dummy column is inserted to retain the empty message.
+ checkWithFileAndClassName("EmptyProtoWrapper") {
+ case (name, descFilePathOpt) =>
+ val df = emptyBinaryDF.select(
+ from_protobuf_wrapper($"binary", name, descFilePathOpt,
options).as("wrapper")
+ )
+ // Nested empty message is retained by adding dummy column to the
schema.
+ assert(df.schema == structFromDDL("wrapper struct" +
+ "<name: string, empty_proto struct<__dummy_field_in_empty_struct:
string>>"))
+ }
+ }
+
+ test("Write empty proto to parquet when retain.empty.message.types=true") {
+ // When retain.empty.message.types=true, empty proto like 'message A {}'
can be retained
+ // as a field by inserting a dummy column as sub column, such schema can
be written to parquet.
+ val options = Map("recursive.fields.max.depth" -> "4",
"retain.empty.message.types" -> "true")
Review Comment:
Remove 'recursive' setting. Not used here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]