diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 0fcdd420bcfe3..8cb7ee78b00d2 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -17,7 +17,7 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. - - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independently of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. + - In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings without valid root JSON tokens (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index e0cab537ce1c6..27b6a63956198 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -583,7 +583,11 @@ case class JsonToStructs( (StructType(StructField("value", other) :: Nil), other) } - val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false) + val rawParser = new JacksonParser( + actualSchema, + parsedOptions, + allowArrayAsStructs = false, + skipInputWithoutTokens = false) val createParser = CreateJacksonParser.utf8String _ new FailureSafeParser[UTF8String]( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 3f245e1400fa1..0f206f843cc6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -40,7 +40,8 @@ import org.apache.spark.util.Utils class JacksonParser( schema: DataType, val options: JSONOptions, - allowArrayAsStructs: Boolean) extends Logging { + allowArrayAsStructs: Boolean, + skipInputWithoutTokens: Boolean) extends Logging { import JacksonUtils._ import com.fasterxml.jackson.core.JsonToken._ @@ -399,6 +400,7 @@ class JacksonParser( // a null first token is equivalent to testing for input.trim.isEmpty // but it works on any token stream and not just strings parser.nextToken() match { + case null if skipInputWithoutTokens => Nil case null => throw new RuntimeException("Not found any JSON token") case _ => rootConverter.apply(parser) match { case null => throw new RuntimeException("Root converter returned null") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ce8e4c8f5b82b..96b3897f1d038 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -455,7 +455,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val createParser = CreateJacksonParser.string _ val parsed = jsonDataset.rdd.mapPartitions { iter => - val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) + val rawParser = new JacksonParser( + actualSchema, + parsedOptions, + allowArrayAsStructs = true, + skipInputWithoutTokens = true) val parser = new FailureSafeParser[String]( input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 40f55e7068010..49b6bc12ce18f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -125,7 +125,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } (file: PartitionedFile) => { - val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) + val parser = new JacksonParser( + actualSchema, + parsedOptions, + allowArrayAsStructs = true, + skipInputWithoutTokens = true) JsonDataSource(parsedOptions).readFile( broadcastedHadoopConf.value.value, file, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8f575a371c98e..eb18bf8ece999 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -70,7 +70,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone) val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) + val parser = new JacksonParser( + dummySchema, + dummyOption, + allowArrayAsStructs = true, + skipInputWithoutTokens = true) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() @@ -1126,7 +1130,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, null, null), Row(null, null, null), Row(null, null, null), - Row(null, null, null), Row("str_a_4", "str_b_4", "str_c_4"), Row(null, null, null)) ) @@ -1148,7 +1151,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( jsonDF.select($"a", $"b", $"c", $"_unparsed"), Row(null, null, null, "{") :: - Row(null, null, null, "") :: Row(null, null, null, """{"a":1, b:2}""") :: Row(null, null, null, """{"a":{, b:3}""") :: Row("str_a_4", "str_b_4", "str_c_4", null) :: @@ -1163,7 +1165,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"), Row("{") :: - Row("") :: Row("""{"a":1, b:2}""") :: Row("""{"a":{, b:3}""") :: Row("]") :: Nil @@ -1185,7 +1186,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( jsonDF.selectExpr("a", "b", "c", "_malformed"), Row(null, null, null, "{") :: - Row(null, null, null, "") :: Row(null, null, null, """{"a":1, b:2}""") :: Row(null, null, null, """{"a":{, b:3}""") :: Row("str_a_4", "str_b_4", "str_c_4", null) :: @@ -2531,7 +2531,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } checkCount(2) - countForMalformedJSON(1, Seq("")) + countForMalformedJSON(0, Seq("")) } test("SPARK-25040: empty strings should be disallowed") {
With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org