[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1053683491 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -40,19 +40,27 @@ object SchemaConverters { * * @since 3.4.0 */ - def toSqlType(descriptor: Descriptor): SchemaType = { -toSqlTypeHelper(descriptor) + def toSqlType( + descriptor: Descriptor, + protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = { +toSqlTypeHelper(descriptor, protobufOptions) } - def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized { + def toSqlTypeHelper( + descriptor: Descriptor, + protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized { SchemaType( - StructType(descriptor.getFields.asScala.flatMap(structFieldFor(_, Set.empty)).toArray), + StructType(descriptor.getFields.asScala.flatMap( +structFieldFor(_, + Map(descriptor.getFullName -> 1), + protobufOptions: ProtobufOptions)).toArray), nullable = true) } def structFieldFor( fd: FieldDescriptor, - existingRecordNames: Set[String]): Option[StructField] = { + existingRecordNames: Map[String, Int], Review Comment: +1 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,17 +108,35 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// If the `recursive.fields.max.depth` value is not specified, it will default to -1; +// recursive fields are not permitted. Setting it to 0 drops all recursive fields, +// 1 allows it to be recursed once, and 2 allows it to be recursed twice and so on. +// A value greater than 10 is not allowed, and if a protobuf record has more depth for +// recursive fields than the allowed value, it will be truncated and some fields may be +// discarded. +// SQL Schema for the protobuf message `message Person { string name = 1; Person bff = 2}` +// will vary based on the value of "recursive.fields.max.depth". +// 0: struct +// 1: struct> +// 2: struct>> ... +val recordName = fd.getMessageType.getFullName Review Comment: Good catch. I think the previous code was incorrect. We need to verify if a same Protobuf type was seen before in this DFS traversal. @SandishKumarHN what was the unit test that verified recursion? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -40,19 +40,27 @@ object SchemaConverters { * * @since 3.4.0 */ - def toSqlType(descriptor: Descriptor): SchemaType = { -toSqlTypeHelper(descriptor) + def toSqlType( + descriptor: Descriptor, + protobufOptions: ProtobufOptions = ProtobufOptions(Map.empty)): SchemaType = { +toSqlTypeHelper(descriptor, protobufOptions) } - def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized { + def toSqlTypeHelper( + descriptor: Descriptor, + protobufOptions: ProtobufOptions): SchemaType = ScalaReflectionLock.synchronized { Review Comment: Yeah, I just noticed. Not sure if if we need. @SandishKumarHN could we remove this in a follow up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1053681591 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,14 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the `recursive.fields.max.depth` to 0 drops all recursive fields, + // 1 allows it to be recurse once, and 2 allows it to be recursed twice and so on. + // A value of `recursive.fields.max.depth` greater than 10 is not permitted. If it is not + // specified, the default value is -1; recursive fields are not permitted. If a protobuf + // record has more depth than the allowed value for recursive fields, it will be truncated + // and some fields may be discarded. + val recursiveFieldMaxDepth: Int = parameters.getOrElse("recursive.fields.max.depth", "-1").toInt Review Comment: @cloud-fan this is in line with options for Kafka source. e.g. 'kafka.' prefix allows setting Kafka clientconfigs. In addition, we will be passing more options. E.g. for schema registry auth configs. They will have a prefix like 'confluent.schemaregistry.[actual registry client conf]' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1051293916 ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala: ## @@ -693,4 +693,435 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", parameters = Map("descFilePath" -> testFileDescriptor)) } + + test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> from_protobuf") { +val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent") +val oneOfEvent = OneOfEvent.newBuilder() + .setKey("key") + .setCol1(123) + .setCol3(109202L) + .setCol2("col2value") + .addCol4("col4value").build() + +val df = Seq(oneOfEvent.toByteArray).toDF("value") + +checkWithFileAndClassName("OneOfEvent") { + case (name, descFilePathOpt) => +val fromProtoDf = df.select( + from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample) +val toDf = fromProtoDf.select( + to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'toProto) +val toFromDf = toDf.select( + from_protobuf_wrapper($"toProto", name, descFilePathOpt) as 'fromToProto) +checkAnswer(fromProtoDf, toFromDf) +val actualFieldNames = fromProtoDf.select("sample.*").schema.fields.toSeq.map(f => f.name) +descriptor.getFields.asScala.map(f => { + assert(actualFieldNames.contains(f.getName)) +}) + +val eventFromSpark = OneOfEvent.parseFrom( + toDf.select("toProto").take(1).toSeq(0).getAs[Array[Byte]](0)) +// OneOf field: the last set value(by order) will overwrite all previous ones. +assert(eventFromSpark.getCol2.equals("col2value")) +assert(eventFromSpark.getCol3 == 0) +val expectedFields = descriptor.getFields.asScala.map(f => f.getName) +eventFromSpark.getDescriptorForType.getFields.asScala.map(f => { + assert(expectedFields.contains(f.getName)) +}) + +val jsonSchema = + s""" + |{ + | "type" : "struct", + | "fields" : [ { + |"name" : "sample", + |"type" : { + | "type" : "struct", + | "fields" : [ { + |"name" : "key", + |"type" : "string", + |"nullable" : true + | }, { + |"name" : "col_1", + |"type" : "integer", + |"nullable" : true + | }, { + |"name" : "col_2", + |"type" : "string", + |"nullable" : true + | }, { + |"name" : "col_3", + |"type" : "long", + |"nullable" : true + | }, { + |"name" : "col_4", + |"type" : { + | "type" : "array", + | "elementType" : "string", + | "containsNull" : false + |}, + |"nullable" : false + | } ] + |}, + |"nullable" : true + | } ] + |} + |{ + | "type" : "struct", + | "fields" : [ { + |"name" : "sample", + |"type" : { + | "type" : "struct", + | "fields" : [ { + |"name" : "key", + |"type" : "string", + |"nullable" : true + | }, { + |"name" : "col_1", + |"type" : "integer", + |"nullable" : true + | }, { + |"name" : "col_2", + |"type" : "string", + |"nullable" : true + | }, { + |"name" : "col_3", + |"type" : "long", + |"nullable" : true + | }, { + |"name" : "col_4", + |"type" : { + | "type" : "array", + | "elementType" : "string", + | "containsNull" : false + |}, + |"nullable" : false + | } ] + |}, + |"nullable" : true + | } ] + |} + |""".stripMargin +val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType] +val data = Seq(Row(Row("key", 123, "col2value", 109202L, Seq("col4value" +val dataDf = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) +val dataDfToProto = dataDf.select( +
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1051292604 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,14 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the `recursive.fields.max.depth` to 0 allows the field to be recurse once, Review Comment: '0' disables recursion right? Why once? This might be difference in terminology. Thats why giving a quick example is better. Could you add this example?: Consider a simple simple recursive proto 'message Person { string name = 1; Person bff = 2} > What would be spark schema when recursion 0, 1, and 2? I think : - 0: struct - 1: struct> - 2: struct>> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1051292604 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,14 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the `recursive.fields.max.depth` to 0 allows the field to be recurse once, Review Comment: '0' disables recursion right? Why once? This might be difference in terminology. Thats why giving a quick example is better. Could you add this example?: Consider a simple simple recursive proto 'message Person { string name = 1; Person bff = 2} > What would be spark schema when recursion 0, 1, and 2? I think : - 0: struct - 1: struct> - 2: struct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: Scratch the above suggestion. Instead you could add 'else' to what you have and remove 'return'. That is simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: Scratch the above suggestion. Instead you could add 'else' what you have and remove 'return'. That is simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049126074 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting + // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed + // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not + // specified, it will default to -1, which disables recursive fields. Review Comment: '-1' implies recursive fields are not allowed. ("disables" does not clearly imply that it will be an error") ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting + // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed + // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not + // specified, it will default to -1, which disables recursive fields. + val circularReferenceDepth: Int = parameters.getOrElse("circularReferenceDepth", "-1").toInt Review Comment: Suggestion for renaming this option: _"recursive.fields.max.depth"_ _circularReferenceDepth_ sounds very code variable type. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && + protobufOptions.circularReferenceDepth < 0 ) { throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) +} else if (existingRecordNames.contains(recordName) && + existingRecordNames.getOrElse(recordName, 0) +> protobufOptions.circularReferenceDepth) { + return Some(StructField(fd.getName, NullType, nullable = false)) Review Comment: Why is nullable false? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting + // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed + // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not + // specified, it will default to -1, which disables recursive fields. Review Comment: Also warn that if the the protobuf record has more depth for recursive fields than allowed here, it will be truncated to the allowed depth. The implies some fields are discarded from the record. Could you add a simple example in the comment showing resulting spark schema when this is set to '0' and '2'. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: Better to remove 'return' statement. How about" val recursiveDepth =
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1044083377 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: :) yeah, field names should not matter at all. We can do video chat to clarify all this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1044066697 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: Given this discussion, let's write down functionality and examples, before we implement so that we are all on the same page. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1044064987 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > we would fail to detect the recursion for above because the thread would be Why would we fail? Lets say user does `from_protobuf(col, 'message_A')` `A aa = 1` at line 5 would be treated as recursion. Why are field names relevant at all? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > we would fail to detect the recursion for above because the thread would be Why would we fail? Lets say user does `from_protobuf(col, 'message_A')`, `A aa = 1` at line 5 would be treated as recursion. Why are field names relevant at all? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1044065935 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > thread would be `A.B.A.aa.D.d.A.aaa.E` What is this thread? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1044065444 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: Does the code in master fail to detect this recursion? In that case it would lead to unbounded schema and fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1044064987 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > we would fail to detect the recursion for above because the thread would be Why would we fail? Lets say this is doing `from_protobuf(col, 'message_A')` `A aa = 1` at line 5 would be treated as recursion. Why are field names relevant at all? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043982706 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: I see only one type of recursion, that is based on protobuf message type. I.e. message A ends up including itself either directly or indirectly through nesting. This is the 1st example in @SandishKumarHN's message above. The second one does not have recursion. Looking for examples of other types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043982706 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: I see only one type of recursion, that is just just based on protobuf message. I.e. message A ends up including itself either directly or indirectly through nesting. This is the 1st example in @SandishKumarHN's message above. The second one does not have recursion. Looking for examples of other types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043982706 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: I see only one type of recursion, that is just just based on protobuf message. I.e. message A ends up including itself either directly or indirectly through nesting. This is the 1st one in @SandishKumarHN's message above. The second one does not have recursion. Looking for examples of other types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand how that is related 'type' vs 'name'. There is only one type of recursion. If the redundant data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). Or just drop them in Spark sql. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand how that is related to recursion. If the redundant data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). Or just drop them in Spark sql. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand. If the redundat data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). Or drop them in Spark sql. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043915051 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: @baganokodo2022 Could you translate that to actual protobuf to illustrate the problem? I still don't understand. If the redundat data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: @baganokodo2022 Could you translate that to actual protobufs to illustrate the problem? I still don't understand. If the redundat data in the warehouse is concern, customers can process with a smaller protobuf (say with unneeded fields removed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043846427 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > in the case of field_name recursive check it is A.B.C no recursion. The first example is clearly recursion. What is 'C' here? > but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE Why is this recursion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043846985 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: Are our unit tests showing these cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043846427 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > in the case of field_name recursive check it is A.B.C no recursion. That example is clearly recursion. What is 'C' here? > but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE Why is this recursion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043846427 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: > in the case of field_name recursive check it is A.B.C no recursion. That example is clearly recursion. What is 'C' here. > but it will also throw an error for the below case with the field_type check. since it will be MESSAGE.MESSAGE.MESSAGE.MESSAGE Why is this recursion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043842836 ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala: ## @@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage} import org.apache.spark.sql.{Column, QueryTest, Row} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.functions.{lit, struct} -import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated +import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated} Review Comment: Could we move that to different tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1043726930 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +109,38 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { - throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) +// User can set circularReferenceDepth of 0 or 1 or 2. +// Going beyond 3 levels of recursion is not allowed. +if (protobufOptions.circularReferenceType.equals("FIELD_TYPE")) { + if (existingRecordTypes.contains(fd.getType.name()) && +(protobufOptions.circularReferenceDepth < 0 || + protobufOptions.circularReferenceDepth >= 3)) { +throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) + } else if (existingRecordTypes.contains(fd.getType.name()) && Review Comment: name or full name? also what keeps track of the recursion depth? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + val circularReferenceType: String = parameters.getOrElse("circularReferenceType", "FIELD_NAME") Review Comment: @SandishKumarHN @baganokodo2022 moving the discussion here (for threading). > Besides, can we also support a "CircularReferenceType" option with a enum value of [FIELD_NAME, FIELD_TYPE]. The reason is because navigation can go very deep before the same fully-qualified FIELD_NAME is encountered again. While FIELD_TYPE stops recursive navigation much faster. ... I didn't quite follow the motivation here. Could you give a concrete examples for the two difference cases? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +109,38 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { - throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) +// User can set circularReferenceDepth of 0 or 1 or 2. +// Going beyond 3 levels of recursion is not allowed. Review Comment: Could you add a justification for this? ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala: ## @@ -26,11 +26,11 @@ import com.google.protobuf.{ByteString, DynamicMessage} import org.apache.spark.sql.{Column, QueryTest, Row} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.functions.{lit, struct} -import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated +import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.{EventRecursiveA, EventRecursiveB, OneOfEvent, OneOfEventWithRecursion, SimpleMessageRepeated} Review Comment: Are there tests for recursive fields? ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala: ## @@ -693,4 +693,178 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", parameters = Map("descFilePath" -> testFileDescriptor)) } + + test("Unit test for Protobuf OneOf field") { Review Comment: Add a short description of the test at the top. It improves readability. What is this verifying? Remove "Unit test for", this is already a unit test :). ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala: ## @@ -693,4 +693,178 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", parameters = Map("descFilePath" -> testFileDescriptor)) } + + test("Unit test for Protobuf OneOf field") { +val descriptor = ProtobufUtils.buildDescriptor(testFileDesc, "OneOfEvent") +val oneOfEvent = OneOfEvent.newBuilder() + .setKey("key") + .setCol1(123) + .setCol3(109202L) + .setCol2("col2value") + .addCol4("col4value").build() + +val df = Seq(oneOfEvent.toByteArray).toDF("value") + +val fromProtoDf = df.select( + functions.from_protobuf($"value", "OneOfEvent", testFileDesc) as 'sample) +val toDf = fromProtoDf.select( + functions.to_protobuf($"sample", "OneOfEvent", testFileDesc) as 'toProto) +val toFromDf = toDf.select( +
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1041541422 ## connector/protobuf/src/test/resources/protobuf/functions_suite.proto: ## @@ -170,4 +170,41 @@ message timeStampMsg { message durationMsg { string key = 1; Duration duration = 2; -} \ No newline at end of file +} + +message OneOfEvent { Review Comment: Combined one is fine, we could keep it. Better to have a simpler separate tests as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1041469181 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala: ## @@ -157,6 +157,8 @@ private[sql] class ProtobufDeserializer( case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal) + case (MESSAGE, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal) Review Comment: What is this for? For handling limited recursion? ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala: ## @@ -693,4 +721,45 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", parameters = Map("descFilePath" -> testFileDescriptor)) } + + test("Unit tests for OneOf field support and recursion checks") { Review Comment: Lets separate these two into separate tests with separate protobuf message. ## connector/protobuf/src/test/resources/protobuf/functions_suite.proto: ## @@ -170,4 +170,41 @@ message timeStampMsg { message durationMsg { string key = 1; Duration duration = 2; -} \ No newline at end of file +} + +message OneOfEvent { Review Comment: Are you testing more OneOf and recusion in the same message? Could you split them into separate messages? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,9 +92,13 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => +// Stop recursion at the first level when a recursive field is encountered. +// TODO: The user should be given the option to set the recursion level to 1, 2, or 3 Review Comment: Are you planning to add selectable recursion depth here or in a follow up? ## connector/protobuf/src/test/resources/protobuf/functions_suite.proto: ## @@ -170,4 +170,41 @@ message timeStampMsg { message durationMsg { string key = 1; Duration duration = 2; -} \ No newline at end of file +} + +message OneOfEvent { + string key = 1; + oneof payload { Review Comment: How do one-of fields look like in spark schema? Could you give an example? I could not see the schema in the unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1041467502 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,9 +92,13 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => +// Stop recursion at the first level when a recursive field is encountered. +// TODO: The user should be given the option to set the recursion level to 1, 2, or 3 Review Comment: Yeah, I think it is useful. Users may not be able to remove recursive references, but might be willing to limit recursion. I think the default should be an error with a clear message about how users can set configuration. Also, I don't think it should be spark config, but rather an `option` passed in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org