cloud-fan commented on a change in pull request #33212:
URL: https://github.com/apache/spark/pull/33212#discussion_r668068075
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
##########
@@ -418,6 +426,19 @@ class JacksonParser(
}
}
+ // When the input schema is setting to `nullable = false`, make sure the
field is not null.
+ var index = 0
+ while (badRecordException.isEmpty && !skipRow && index < schema.length) {
+ if (!schema(index).nullable && row.isNullAt(index)) {
+ throw new IllegalSchemaArgumentException(
+ s"the null value found when parsing non-nullable field
${schema(index).name}.")
+ }
+ if (!checkedIndexSet.contains(index)) {
+ skipRow = structFilters.skipRow(row, index)
Review comment:
This was not done before your PR? The missing field is always filled
with null value, and we can use it to filter rows. This doesn't seem to be
related to this PR.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
##########
@@ -29,11 +29,30 @@ class FailureSafeParser[IN](
schema: StructType,
columnNameOfCorruptRecord: String) {
+ disableNotNullableForPermissiveMode
private val corruptFieldIndex =
schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name ==
columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)
private val nullResult = new GenericInternalRow(schema.length)
+ // As PERMISSIVE mode should not fail at runtime, so fail if the mode is
PERMISSIVE and schema
+ // contains non-nullable fields.
+ private def disableNotNullableForPermissiveMode: Unit = {
Review comment:
nit: this method has side effect and it's better to define it with
parentheses.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
##########
@@ -2924,6 +2924,55 @@ abstract class JsonSuite
}
}
}
+
+ test("SPARK-35912: nullability with different parse mode -- struct") {
+ // JSON field is missing.
+ val missingFieldInput = """{"c1": 1}"""
+ // JSON filed is null.
+ val nullValueInput = """{"c1": 1, "c2": null}"""
+
+ val load = (mode: String, schema: StructType, inputJson: String) => {
+ val json = spark.createDataset(
+ spark.sparkContext.parallelize(inputJson :: Nil))(Encoders.STRING)
+ spark.read
+ .option("mode", mode)
+ .schema(schema)
+ .json(json)
+ }
+
+ Seq(true, false).foreach { nullable =>
+ val schema = StructType(Seq(
+ StructField("c1", IntegerType, nullable = true),
+ StructField("c2", IntegerType, nullable = nullable)))
+
+ Seq(missingFieldInput, nullValueInput).foreach { jsonString =>
+ if (nullable) {
+ checkAnswer(load("DROPMALFORMED", schema, jsonString), Row(1, null)
:: Nil)
+ checkAnswer(load("FAILFAST", schema, jsonString), Row(1, null) ::
Nil)
+ checkAnswer(load("PERMISSIVE", schema, jsonString), Row(1, null) ::
Nil)
+ } else {
+ checkAnswer(load("DROPMALFORMED", schema, jsonString), Seq.empty)
+
+ val exceptionMsg1 = intercept[SparkException] {
+ load("FAILFAST", schema, jsonString).collect
+ }.getMessage
+ val expectedMsg1 = if (jsonString == missingFieldInput) {
+ "field c2 is not nullable but it's missing in one record."
+ } else {
+ s"field c2 is not nullable but the parsed value is null."
+ }
+ assert(exceptionMsg1.contains(expectedMsg1))
+
+ val exceptionMsg2 = intercept[SparkException] {
+ load("PERMISSIVE", schema, jsonString).collect
+ }
+ val expectedMsg2 =
+ "field c2 is not nullable but the not nullable field is not
allowed in PERMISSIVE mode."
Review comment:
nit: `Field c2 is not nullable but PERMISSIVE mode only works with
nullable fields.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]