rangadi commented on code in PR #40141:
URL: https://github.com/apache/spark/pull/40141#discussion_r1115442173
##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala:
##########
@@ -1101,166 +939,61 @@ class ProtobufFunctionsSuite extends QueryTest with
SharedSparkSession with Prot
val optionsZero = new java.util.HashMap[String, String]()
optionsZero.put("recursive.fields.max.depth", "1")
- val schemaZero = DataType.fromJson(
- s"""{
- | "type" : "struct",
- | "fields" : [ {
- | "name" : "sample",
- | "type" : {
- | "type" : "struct",
- | "fields" : [ {
- | "name" : "name",
- | "type" : "string",
- | "nullable" : true
- | }, {
- | "name" : "bff",
- | "type" : "void",
- | "nullable" : true
- | } ]
- | },
- | "nullable" : true
- | } ]
- |}""".stripMargin).asInstanceOf[StructType]
- val expectedDfZero = spark.createDataFrame(
- spark.sparkContext.parallelize(Seq(Row(Row("person0", null)))),
schemaZero)
-
- testFromProtobufWithOptions(df, expectedDfZero, optionsZero, "EventPerson")
-
- val optionsOne = new java.util.HashMap[String, String]()
- optionsOne.put("recursive.fields.max.depth", "2")
- val schemaOne = DataType.fromJson(
- s"""{
- | "type" : "struct",
- | "fields" : [ {
- | "name" : "sample",
- | "type" : {
- | "type" : "struct",
- | "fields" : [ {
- | "name" : "name",
- | "type" : "string",
- | "nullable" : true
- | }, {
- | "name" : "bff",
- | "type" : {
- | "type" : "struct",
- | "fields" : [ {
- | "name" : "name",
- | "type" : "string",
- | "nullable" : true
- | }, {
- | "name" : "bff",
- | "type" : "void",
- | "nullable" : true
- | } ]
- | },
- | "nullable" : true
- | } ]
- | },
- | "nullable" : true
- | } ]
- |}""".stripMargin).asInstanceOf[StructType]
+ val schemaOne = structFromDDL(
Review Comment:
As part of this PR, replaced many long json schemas with DDL which is lot
more concise.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -157,9 +157,6 @@ private[sql] class ProtobufDeserializer(
(protoType.getJavaType, catalystType) match {
case (null, NullType) => (updater, ordinal, _) =>
updater.setNullAt(ordinal)
- // It is possible that this will result in data being dropped, This is
intentional,
- // to catch recursive fields and drop them as necessary.
- case (MESSAGE, NullType) => (updater, ordinal, _) =>
updater.setNullAt(ordinal)
Review Comment:
`NullType` is no longer expected.
##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -59,6 +58,8 @@ object SchemaConverters {
// existingRecordNames: Map[String, Int] used to track the depth of
recursive fields and to
// ensure that the conversion of the protobuf message to a Spark SQL
StructType object does not
// exceed the maximum recursive depth specified by the
recursiveFieldMaxDepth option.
+ // A return of None implies the field has reached the maximum allowed
recursive depth and
Review Comment:
This file has the main fixes. A 'None' is returned when recursion depth is
reached. And the field is dropped by the caller (i.e. outer struct).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]