[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r236049216 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file --- End diff -- @cloud-fan @HyukjinKwon Here is a PR https://github.com/apache/spark/pull/23130 which does this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r235584943 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file --- End diff -- shall we skip empty files for all the file-based data sources? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r235583851 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row("1"), Row("2"))) } - test("SPARK-11226 Skip empty line in json file") { --- End diff -- Where is it moved to then? Does that mean we don't have a regression test for SPARK-11226 anymore? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r235583559 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file --- End diff -- If that's true, we should not do this. Empty files can be generated in many cases for now and the behaviour is not currently well defined. If we rely on this behaviour, it will cause some weird behaviours or bugs hard to fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r235583349 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file --- End diff -- Wait, does this mean that it reads an empty record from empty file after this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r235583315 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1905,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 4, 6)) + checkAnswer(counts, Row(1, 5, 7)) // null row for empty file --- End diff -- Wait, does this mean that it reads an empty record from empty file after this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22938 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232605204 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType .toDF("value") +.repartition(1) --- End diff -- As far as I remember I added the `repartition(1)` here and in other places because to eliminate empty files. Such empty files are produced by empty partitions. Probably we could avoid writing empty files at least in the case of text-based datasources but any case let's look at `TextOutputWriter`, for example. It creates an input stream for a file in its constructor: https://github.com/apache/spark/blob/46110a589f4e91cd7605c5a2c34c3db6b2635830/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L151 and closes the empty file in https://github.com/apache/spark/blob/46110a589f4e91cd7605c5a2c34c3db6b2635830/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L162 . So, even if we didn't write anythings to the file, it creates an empty file. From the read side, when `Jackson` parser tries to read the empty file, it cannot detect any JSON tokens on the root level and returns null from `nextToken()` for which I throw a bad record exception for now -> `Row(...)` in `PERMISSIVE` mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232594164 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, null, null), Row(null, null, null), Row(null, null, null), +Row(null, null, null), --- End diff -- > so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode. Yes, we skipped such rows if `Jackson` parser wasn't able to find any root tokens. So, not only empty strings and gaps got into the category. > Shall we clearly mention it in the migration guide? Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232589526 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,23 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => row.getArray(0) +case _: MapType => (row: InternalRow) => row.getMap(0) + } + + // This converts parsed rows to the desired output by the given schema. + private def convertRow(rows: Iterator[InternalRow]) = { +if (rows.hasNext) { + val result = rows.next() + // JSON's parser produces one record only. + assert(!rows.hasNext) + castRow(result) +} else { + throw new IllegalArgumentException("Expected one row from JSON parser.") --- End diff -- Right, it must not happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232550860 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType .toDF("value") +.repartition(1) --- End diff -- why is the `repartition` required? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232550733 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, null, null), Row(null, null, null), Row(null, null, null), +Row(null, null, null), --- End diff -- so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode. Shall we clearly mention it in the migration guide? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232550502 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,23 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => row.getArray(0) +case _: MapType => (row: InternalRow) => row.getMap(0) + } + + // This converts parsed rows to the desired output by the given schema. + private def convertRow(rows: Iterator[InternalRow]) = { +if (rows.hasNext) { + val result = rows.next() + // JSON's parser produces one record only. + assert(!rows.hasNext) + castRow(result) +} else { + throw new IllegalArgumentException("Expected one row from JSON parser.") --- End diff -- This can only happen when we have a bug, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232550186 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. + - In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. --- End diff -- > In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s Shall we update this? According to what you said, JSON data source can't produce null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232494623 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- I have discarded the recent changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232484880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- I think it's okay to return `null` for map and array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232446781 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. + - In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. --- End diff -- When we use the data source, we can specify the schema as `StructType` only. In that case, we get a `Seq[InternalRow]` or `Nil` from JacksonParser which is `flatMap`ped, or `BadRecordException` which is converted to `Iterator[InternalRow]`. It seems there is no way to get `null` rows. The difference between JSON datasource and JSON functions is formers don't (and cannot) do flattening. So, the `Nil` case should be handled especially (this PR addresses the case). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232446534 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- I could revert the recent commits and prepare a separate PR for the behaviour change. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232386932 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- > but we need more discussion about this behavior. @cloud-fan Should I send an email to the dev list or we can discuss this here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r231783277 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- I also thought what is better to return here - `null` or empty `Array`/`MapData`. In the case of `StructType` we return `Row` in the `PERMISSIVE` mode. For consistency should we return empty array/map in this mode too? Maybe we can consider special mode when we can return `null` for the bad record? For now it is easy to do since we use `FailureSafeParser`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r231762733 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- I think this is the place `from_json` is different from json data source. A data source must produce data as rows, while the `from_json` can return array or map. I think the previous behavior also makes sense. For array/map, we don't have the corrupted column, and returning null is reasonable. Actually I prefer null over empty array/map, but we need more discussion about this behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r231745125 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. + - In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. --- End diff -- just for curiosity, how can the json data source return null rows? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r231251408 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,23 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } + private val castRow = nullableSchema match { --- End diff -- I agree, it makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r231189880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,23 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } + private val castRow = nullableSchema match { --- End diff -- Sorry, so first I have seen the `converter` which can be now a `def` without `@transient` but for `castRow` the `@transient` and `lazy` should be added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r231156386 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,23 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } + private val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => row.getArray(0) +case _: MapType => (row: InternalRow) => row.getMap(0) + } + // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + lazy val converter = (rows: Iterator[InternalRow]) => { --- End diff -- I think earlier `lazy val` made sense as it stored a function depending on the `nullableSchema` but now it can be a simple `def`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r230586281 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row("1"), Row("2"))) } - test("SPARK-11226 Skip empty line in json file") { --- End diff -- I removed the test because it is not relevant to the default mode `PERMISSIVE` any more. And the `SQLQuerySuite` is not perfect place for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r230585549 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -552,13 +552,19 @@ case class JsonToStructs( // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + val result = rows.next() + // JSON's parser produces one record only. + assert(!rows.hasNext) + nullableSchema match { +case _: StructType => result --- End diff -- I don't visible overhead of this in the profiler but will change it since it is easy to do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r230581932 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -552,13 +552,19 @@ case class JsonToStructs( // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + val result = rows.next() + // JSON's parser produces one record only. + assert(!rows.hasNext) + nullableSchema match { +case _: StructType => result --- End diff -- @MaxGekk, this looks going to type-dispatch here per record. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/22938 [SPARK-25935][SQL] Prevent null rows from JSON parser ## What changes were proposed in this pull request? An input without valid JSON tokens on the root level will be treated as a bad record, and handled according to `mode`. Previously such input was converted to `null`. After the changes, the input is converted to a row with `null`s in the `PERMISSIVE` mode according the schema. This allows to remove a code in the `from_json` function which can produce `null` as result rows. ## How was this patch tested? It was tested by existing test suites. Some of them I have to modify (`JsonSuite` for example) because previously bad input was just silently ignored. For now such input is handled according to specified `mode`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 json-nulls Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22938 commit 31cb534d22cbab31fad5fc44115e67ef973420ea Author: Maxim Gekk Date: 2018-11-03T21:24:01Z Eliminate producing nulls by JSON parser commit 0589d9195ef396b2a94bb2dfdc3000ffc8eb Author: Maxim Gekk Date: 2018-11-04T08:00:23Z Updating the migration guide --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org