cloud-fan commented on a change in pull request #33212:
URL: https://github.com/apache/spark/pull/33212#discussion_r668068075



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
##########
@@ -418,6 +426,19 @@ class JacksonParser(
       }
     }
 
+    // When the input schema is setting to `nullable = false`, make sure the 
field is not null.
+    var index = 0
+    while (badRecordException.isEmpty && !skipRow && index < schema.length) {
+      if (!schema(index).nullable && row.isNullAt(index)) {
+        throw new IllegalSchemaArgumentException(
+          s"the null value found when parsing non-nullable field 
${schema(index).name}.")
+      }
+      if (!checkedIndexSet.contains(index)) {
+        skipRow = structFilters.skipRow(row, index)

Review comment:
       This was not done before your PR? The missing field is always filled 
with null value, and we can use it to filter rows. This doesn't seem to be 
related to this PR.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
##########
@@ -29,11 +29,30 @@ class FailureSafeParser[IN](
     schema: StructType,
     columnNameOfCorruptRecord: String) {
 
+  disableNotNullableForPermissiveMode
   private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
   private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
   private val resultRow = new GenericInternalRow(schema.length)
   private val nullResult = new GenericInternalRow(schema.length)
 
+  // As PERMISSIVE mode should not fail at runtime, so fail if the mode is 
PERMISSIVE and schema
+  // contains non-nullable fields.
+  private def disableNotNullableForPermissiveMode: Unit = {

Review comment:
       nit: this method has side effect and it's better to define it with 
parentheses.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
##########
@@ -2924,6 +2924,55 @@ abstract class JsonSuite
       }
     }
   }
+
+  test("SPARK-35912: nullability with different parse mode -- struct") {
+    // JSON field is missing.
+    val missingFieldInput = """{"c1": 1}"""
+    // JSON filed is null.
+    val nullValueInput = """{"c1": 1, "c2": null}"""
+
+    val load = (mode: String, schema: StructType, inputJson: String) => {
+      val json = spark.createDataset(
+        spark.sparkContext.parallelize(inputJson :: Nil))(Encoders.STRING)
+      spark.read
+        .option("mode", mode)
+        .schema(schema)
+        .json(json)
+    }
+
+    Seq(true, false).foreach { nullable =>
+      val schema = StructType(Seq(
+        StructField("c1", IntegerType, nullable = true),
+        StructField("c2", IntegerType, nullable = nullable)))
+
+      Seq(missingFieldInput, nullValueInput).foreach { jsonString =>
+        if (nullable) {
+          checkAnswer(load("DROPMALFORMED", schema, jsonString), Row(1, null) 
:: Nil)
+          checkAnswer(load("FAILFAST", schema, jsonString), Row(1, null) :: 
Nil)
+          checkAnswer(load("PERMISSIVE", schema, jsonString), Row(1, null) :: 
Nil)
+        } else {
+          checkAnswer(load("DROPMALFORMED", schema, jsonString), Seq.empty)
+
+          val exceptionMsg1 = intercept[SparkException] {
+            load("FAILFAST", schema, jsonString).collect
+          }.getMessage
+          val expectedMsg1 = if (jsonString == missingFieldInput) {
+            "field c2 is not nullable but it's missing in one record."
+          } else {
+            s"field c2 is not nullable but the parsed value is null."
+          }
+          assert(exceptionMsg1.contains(expectedMsg1))
+
+          val exceptionMsg2 = intercept[SparkException] {
+            load("PERMISSIVE", schema, jsonString).collect
+          }
+          val expectedMsg2 =
+            "field c2 is not nullable but the not nullable field is not 
allowed in PERMISSIVE mode."

Review comment:
       nit: `Field c2 is not nullable but PERMISSIVE mode only works with 
nullable fields.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to