[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r234792965 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -1694,7 +1694,7 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) s <- collect(select(df, from_json(df$col, schema2))) - expect_equal(s[[1]][[1]], NA) + expect_equal(s[[1]][[1]]$date, NA) --- End diff -- Do you mean this particular line or in general? This line was changed because in the `PERMISSIVE` mode we usually return a `Row` with null fields that we wasn't able to parse instead of just `null` for whole row. In general, to fully support the `PERMISSIVE` mode without any excuses when uniVocity parser cannot detect any JSON tokens on root level. We switched to `FailureSafeParser` in `from_json` and `PERMISSIVE` as the default mode, recently there #22237. Previously `from_json` didn't support any modes comparing to JSON datasource. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r234789275 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -1694,7 +1694,7 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) s <- collect(select(df, from_json(df$col, schema2))) - expect_equal(s[[1]][[1]], NA) + expect_equal(s[[1]][[1]]$date, NA) --- End diff -- What is the reason we made this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22237 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r224069683 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,57 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( --- End diff -- It's not about the real business logic, but an idea to make the code simpler. ``` val parserSchema = nullableSchema match { case s: StructType => s case other => new StructType("value", other) } new FailureSafeParser[UTF8String]( input => rawParser.parse(input, createParser, identity[UTF8String]), mode, parserSchema, parsedOptions.columnNameOfCorruptRecord, parsedOptions.multiLine) ``` Then we don't need to change `FailureSafeParser` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r224044435 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,57 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( --- End diff -- Frankly speaking I don't fully understand the idea. Let's look at an example. We should parser JSON arrays (one array per row) like: ``` [1, 2, 3] [4, 5] ``` and an user provided the schema `ArrayType(IntegerType, true)`. So, you propose to wrap the array by `StructType(Seq(StructField(ArrayType(IntegerType, ...`, right? And use the code inside of `JacksonParser` which we disabled by `allowArrayAsStructs` for now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r224042803 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- For now the case is more concrete, we return null if `Jackson` parser doesn't find any token in the input. Not sure, this detailed info about underlying problem can help users much more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r224042251 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- > we don't have to do it in this PR, but it would be great to document when this expression will return null ... We already state in the docs for `from_json()`: `Returns null, in the case of an unparseable string.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223912044 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala --- @@ -450,21 +456,24 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array of single object, schema=struct, output=single row") { val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) -val output = InternalRow(1) +val output = InternalRow(null) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } - test("from_json - input=array, schema=struct, output=null") { + test("from_json - input=array, schema=struct, output=single row") { val input = """[{"a": 1}, {"a": 2}]""" -val schema = StructType(StructField("a", IntegerType) :: Nil) -val output = null -checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) +val corrupted = "corrupted" +val schema = new StructType().add("a", IntegerType).add(corrupted, StringType) +StructType(StructField("a", IntegerType) :: Nil) +val output = InternalRow(null, UTF8String.fromString(input)) +val options = Map("columnNameOfCorruptRecord" -> corrupted) +checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), output) } test("from_json - input=empty array, schema=struct, output=null") { --- End diff -- `output=single row` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223911893 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala --- @@ -450,21 +456,24 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array of single object, schema=struct, output=single row") { val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) -val output = InternalRow(1) +val output = InternalRow(null) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } - test("from_json - input=array, schema=struct, output=null") { + test("from_json - input=array, schema=struct, output=single row") { val input = """[{"a": 1}, {"a": 2}]""" -val schema = StructType(StructField("a", IntegerType) :: Nil) -val output = null -checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) +val corrupted = "corrupted" +val schema = new StructType().add("a", IntegerType).add(corrupted, StringType) +StructType(StructField("a", IntegerType) :: Nil) --- End diff -- unnecessary line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223911795 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- ... and with some tests to verify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223911576 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- we don't have to do this PR, but it would be great to document when this expression will return null, in the class doc of `JsonToStructs` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223852220 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223849217 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- Actually I am wrong we return empty iterator and as the consequence of that null in the case if there is no input tokens, there https://github.com/apache/spark/blob/a8a1ac01c4732f8a738b973c8486514cd88bf99b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala#L398 We can throw `BadRecordException` instead of Nil but this will change behavior of JSON/CSV datasources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223836030 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- We shouldn't return `null`. I will replace the `null` by exceptions like I did for `from_csv`: https://github.com/apache/spark/pull/22379/files#diff-5321c01e95bffc4413c5f3457696213eR83 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223833101 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -90,6 +91,10 @@ class JacksonParser( // in such an array as a row, this case is possible. if (array.numElements() == 0) { Nil +} else if (array.numElements() > 1 && !explodeArray) { --- End diff -- > ... but I think it's ok to drop it in from_json > ... the new parameter can be named as allowArrayAsStructs Is it ok if I'll do: ``` case START_ARRAY if allowArrayAsStructs => ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223708500 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null --- End diff -- so we may still return null in some cases, can you list them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223707899 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,57 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( --- End diff -- instead of changing this file, can we update `JsonToStructs.parser`, to wrap array or map with a struct before creating `FailureSafeParser`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223703894 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -90,6 +91,10 @@ class JacksonParser( // in such an array as a row, this case is possible. if (array.numElements() == 0) { Nil +} else if (array.numElements() > 1 && !explodeArray) { --- End diff -- the new parameter can be named as `allowArrayAsStructs` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223703462 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -90,6 +91,10 @@ class JacksonParser( // in such an array as a row, this case is possible. if (array.numElements() == 0) { Nil +} else if (array.numElements() > 1 && !explodeArray) { --- End diff -- shall we forbid array as struct type completely for `from_json`? BTW I think this feature is not needed now, since we can directly read top-level json arrays as array type. We may still need to keep it when reading json files, but I think it's ok to drop it in `from_json`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223700226 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + --- End diff -- shall we update the guide too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223696096 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -547,4 +548,27 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Map("pretty" -> "true"))), Seq(Row(expected))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), --- End diff -- @MaxGekk, can we add a test with manual malformed column in the schema? For instance, https://github.com/apache/spark/blob/a8a1ac01c4732f8a738b973c8486514cd88bf99b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala#L1125-L1133 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223675223 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- Introduced `explodeArray` for `JacksonParser`, and modified a test to check the situation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223550790 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- Last option sounds better to me but can we fill the corrupt row when the corrupt field name is specified in the schema? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223545341 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- the last option LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223459186 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- This is the case when an user provided `StructType` schema but we observe an `array` in JSON input. So, JSON datasource returns row per each struct in the array. Currently `from_json` returns `null` in the case. With this PR, `from_json` returns one row with the first element of input array. Because of we cannot return multiple rows from a functions, so we have the following options: - return `null` but this will be the first case when for not `null` input we return `null` (this current approach before the PR) - return a row with one element from the input array (this PR proposes that) - throw an exception which is not nice option in the `PERMISSIVE` mode - throw `BadRecordException` internally, and return `Row(null, null, ..., null)` in the `PERMISSIVE` mode or an exception in `FAILFAST`. It seems the last option is more attractive than other. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223263164 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- I will check that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223254210 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- > Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. Is this behavior same with reading json files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r223144685 --- Diff: docs/sql-programming-guide.md --- @@ -1879,6 +1879,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 2.5 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- Reverted back to 3.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222971397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +dataType: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { -if (corruptFieldIndex.isDefined) { - (row, badRecord) => { -var i = 0 -while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = dataType match { +case struct: StructType => + val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) --- End diff -- Do you mind if I ask to make a private function for this one (L38 to L54) and define this around here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222942666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- ok. I will revert the last changes back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222926650 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- I suggested to keep the previous behaviour only because I thought we're in transition to 2.5. Since we are going ahead for 3.0, I think I am okay without keeping it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222924928 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- After looking at it again, the `mode` option is already there, but `from_json` ignores it. Now this PR looks like a bug fix to me. I'm wondering if we do need to keep the previous behavior. But if we do, I think using a SQL config is more reasonable than adding a new mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222814860 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- A separate boolean flag to JSONOptions or SQL config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222812531 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- I would add it to `from_json` but the function shares the same common classes as JSON data source, and the mode is used in the common classes. Any ideas how to restrict the mode value to `from_json` only? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222811744 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,59 +550,93 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. - @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null -case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) -case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + abstract class RowParser { --- End diff -- Sure, I just wasn't going to mix it to other classes, and sub-classes should extend only `RowParser`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222653808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala --- @@ -51,6 +56,8 @@ object ParseMode extends Logging { case PermissiveMode.name => PermissiveMode case DropMalformedMode.name => DropMalformedMode case FailFastMode.name => FailFastMode +case NullMalformedMode.name => NullMalformedMode --- End diff -- I think we should only add the new mode for `from_json`, not the entire json data source? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222651484 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,59 +550,93 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. - @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null -case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) -case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + abstract class RowParser { --- End diff -- can it be `trait`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222651368 --- Diff: docs/sql-programming-guide.md --- @@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` function supports three modes - `PERMISSIVE`, `FAILFAST` and `NULLMALFORMED`. The modes can be set via the `mode` option. `PERMISSIVE` became the default mode. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. To restore the previous behavior, set the JSON option `mode` to `NULLMALFORMED`. --- End diff -- how about `PERMISSIVE`, `FAILFAST` and `LEGACY`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r222554954 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. + --- End diff -- > Do we have a clear definition of the current behavior? It's important to let user know how the behavior changes. I added new mode in which behavior is the same as current one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220908825 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +dataType: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { --- End diff -- Just to make the review easier, backporting easier, and keep the original author of the codes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220905353 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +dataType: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { --- End diff -- > Can we make the diff small? just wondering what is the reason to make the diff small? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220793666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { --- End diff -- ah i see. If the `mode` option already exist, let's keep it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220789449 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +dataType: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { --- End diff -- Looks we keep the original code and add one case for non struct type from a cursory look. Can we make the diff small? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220788994 --- Diff: docs/sql-programming-guide.md --- @@ -1879,6 +1879,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 2.5 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. --- End diff -- `Since Spark 3.0` -> `Since Spark 2.5` :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220668461 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { --- End diff -- The `JSONOptions` is shared among build-in json functions like `from_json` and JSON datasource. And the formal one use 3 modes - `FAILFAST`, `DROPMALFORMED` and `PERMISSIVE`. I am not sure how the `mode` mode can be replaced. The approach that I could image is to inherit from `JSONOptions` and add new `val`. The `mode` itself cannot be removed because it is used in `FailureSafeParser` for example, in particular `DropMalformedMode` is handled explicitly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220658815 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { --- End diff -- yea, instead the "mode" option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220533794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { --- End diff -- Do you mean by introducing new val `failFast` to JSONOptions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220486164 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. + --- End diff -- We have only particular cases when existing tests are failing in the `PERMISSIVE` mode. I tried to describe the cases here in the migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220400809 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. + --- End diff -- > In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. Why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220400637 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 --- End diff -- 2.5 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220401132 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects are considered as invalid and converted to `null` if specified schema is `StructType`. Since Spark 3.0, the input is considered as a valid JSON array and only its first element is parsed if it conforms to the specified `StructType`. + --- End diff -- > In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. Do we have a clear definition of the current behavior? It's important to let user know how the behavior changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r220401206 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,30 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +if (mode != PermissiveMode && mode != FailFastMode) { --- End diff -- since we only support 2 modes, how about we use a boolean option? e.g. `failFast`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218585584 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala --- @@ -450,7 +450,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array, schema=struct, output=null") { val input = """[{"a": 1}, {"a": 2}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) -val output = null +val output = InternalRow(1) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) --- End diff -- I updated the migration guide and test's title --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218584877 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. --- End diff -- Sure, I added a couple sentences. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218577348 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -595,10 +607,7 @@ case class JsonToStructs( if (json.toString.trim.isEmpty) return null try { - converter(parser.parse( -json.asInstanceOf[UTF8String], -CreateJacksonParser.utf8String, -identity[UTF8String])) + converter(parser.parse(json.asInstanceOf[UTF8String])) } catch { case _: BadRecordException => null --- End diff -- `BadRecordException` shouldn't propagate here any more. I will remove the exception handler and re-run tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218574370 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala --- @@ -402,13 +402,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), - null + InternalRow(null) ) // Other modes should still return `null`. checkEvaluation( JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), --- End diff -- Changed to `FailFastMode.name` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r217995920 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala --- @@ -402,13 +402,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), - null + InternalRow(null) ) // Other modes should still return `null`. checkEvaluation( JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), --- End diff -- The default mode is `PermissiveMode` now, so this looks necessary to change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218000227 --- Diff: docs/sql-programming-guide.md --- @@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.4 to 3.0 + + - Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. --- End diff -- Can we briefly explain the difference between updated behavior and previous one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218000572 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -595,10 +607,7 @@ case class JsonToStructs( if (json.toString.trim.isEmpty) return null try { - converter(parser.parse( -json.asInstanceOf[UTF8String], -CreateJacksonParser.utf8String, -identity[UTF8String])) + converter(parser.parse(json.asInstanceOf[UTF8String])) } catch { case _: BadRecordException => null --- End diff -- Do we still need this catching of `BadRecordException`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r218003046 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala --- @@ -450,7 +450,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with test("from_json - input=array, schema=struct, output=null") { val input = """[{"a": 1}, {"a": 2}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) -val output = null +val output = InternalRow(1) checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) --- End diff -- This looks a behavior change too. Shall we also note this in migration guide? The test name is not correct for now too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r21650 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,26 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exception1 = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() +}.getMessage +assert(exception1.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + +val exception2 = intercept[AnalysisException] { + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect() --- End diff -- I replaced it by `AnalysisException` but I think it is wrong decision. Throwing of `AnalysisException` at run-time looks ugly: ``` Caused by: org.apache.spark.sql.AnalysisException: from_json() doesn't support the DROPMALFORMED mode. Acceptable modes are PERMISSIVE and FAILFAST.; at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser$lzycompute(jsonExpressions.scala:568) at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser(jsonExpressions.scala:564) ... at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` I am going to replace it by something else or revert back to `IllegalArgumentException`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r214302343 --- Diff: docs/sql-programming-guide.md --- @@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. + - Since Spark 2.4, the from_json functions supports two modes - PERMISSIVE and FAILFAST. The modes can be set via the `mode` option. The default mode became PERMISSIVE. In previous versions, behavior of from_json did not conform to either PERMISSIVE nor FAILFAST, especially in processing of malformed JSON records. --- End diff -- nit: from_json -> `` `from_json` ``. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r214218549 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,26 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exception1 = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() +}.getMessage +assert(exception1.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + +val exception2 = intercept[AnalysisException] { + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect() --- End diff -- Can you fix the code to throw an analysis exception in analysis phases instead of execution phases (`.collect()` called)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r213980110 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,28 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +require(mode == PermissiveMode || mode == FailFastMode, --- End diff -- ok, thansk! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r213975610 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,28 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +require(mode == PermissiveMode || mode == FailFastMode, --- End diff -- I didn't put `require` to the constructor body directly because of `timeZoneId`. If I move the checking up, I need to move `val parsedOptions = new JSONOptions(options, timeZoneId.get)` too (lazy or not lazy). Checking will force getting of `timeZoneId.get` which will raise an exception. I will check this today or tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r213886899 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +schema: DataType, --- End diff -- `schema` -> `dataType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r213885777 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,28 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +require(mode == PermissiveMode || mode == FailFastMode, --- End diff -- Also, can we use `AnalysisException` instead of `require`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r213885328 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,28 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parser = { +val parsedOptions = new JSONOptions(options, timeZoneId.get) +val mode = parsedOptions.parseMode +require(mode == PermissiveMode || mode == FailFastMode, --- End diff -- I think we should move this verification into the constructor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r213003614 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exceptionOne = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() +}.getMessage +assert(exceptionOne.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))), + Row(null) :: Row(Row(2)) :: Nil) --- End diff -- Nope, only possibility I raised was to make it generator expression. I haven't proposed a parse mode for this reason so far. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212928747 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,22 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) + @transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) + @transient lazy val createParser = CreateJacksonParser.utf8String _ + @transient lazy val parser = new FailureSafeParser[UTF8String]( +input => rawParser.parse(input, createParser, identity[UTF8String]), +parsedOptions.parseMode, --- End diff -- It is not handled by `JacksonParser`, and the behavior is somehow similar to `PermissiveMode` as @HyukjinKwon pointed out at https://github.com/apache/spark/pull/22237/files#r212850156, but not exactly the same. Seems now the `PermissiveMode` on `FailureSafeParser` has different result on corrupted records. I noticed that some existing tests maybe changed due to that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212925256 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exceptionOne = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() --- End diff -- Behavior of `JsonToStructs` is pretty close to `PERMISSIVE` actually. I have to make just a few small changes in tests that checks processing malformed inputs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212924389 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exceptionOne = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() +}.getMessage +assert(exceptionOne.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))), + Row(null) :: Row(Row(2)) :: Nil) --- End diff -- The `DROPMALFORMED` mode returns `null` for malformed JSON lines. User can filter them out later. @HyukjinKwon Do you know how to drop rows in `UnaryExpression`s? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212922787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,22 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) + @transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) + @transient lazy val createParser = CreateJacksonParser.utf8String _ + @transient lazy val parser = new FailureSafeParser[UTF8String]( +input => rawParser.parse(input, createParser, identity[UTF8String]), +parsedOptions.parseMode, --- End diff -- Previous settings of `FailFastMode` didn't impact on the behavior because the `mode` option wasn't handled at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212891660 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -15,50 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.catalyst.util import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, -schema: StructType, +schema: DataType, columnNameOfCorruptRecord: String, isMultiLine: Boolean) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { -if (corruptFieldIndex.isDefined) { - (row, badRecord) => { -var i = 0 -while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = schema match { +case struct: StructType => + val corruptFieldIndex = struct.getFieldIndex(columnNameOfCorruptRecord) + val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) + val resultRow = new GenericInternalRow(struct.length) + val nullResult = new GenericInternalRow(struct.length) + if (corruptFieldIndex.isDefined) { --- End diff -- Can we move `actualSchema` and `resultRow` into `if (corruptFieldIndex.isDefined) {` inside? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212887790 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,22 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) + @transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) --- End diff -- How about this? ``` @transient lazy val parser = { val parsedOptions = new JSONOptions(options, timeZoneId.get) val rawParser = new JacksonParser(nullableSchema, parsedOptions) val createParser = CreateJacksonParser.utf8String _ new FailureSafeParser[UTF8String]( input => rawParser.parse(input, createParser, identity[UTF8String]), parsedOptions.parseMode, schema, parsedOptions.columnNameOfCorruptRecord, parsedOptions.multiLine) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212850156 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exceptionOne = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() --- End diff -- `JsonToStructs` resembles PERMISSIVE mode (from the first place) although their behaviours are slightly different. This is going to be different with PERMISSIVE and also FAILFAST modes. They are actually behaviour changes if we just use PERMISSIVE mode here by default (as @viirya pointed out). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212849989 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) } + + test("from_json invalid json - check modes") { +val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS() +val schema = new StructType().add("a", IntegerType) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null)) :: Row(Row(2)) :: Nil) + +val exceptionOne = intercept[SparkException] { + df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() +}.getMessage +assert(exceptionOne.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + +checkAnswer( + df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))), + Row(null) :: Row(Row(2)) :: Nil) --- End diff -- How does it work for DROPMALFORMED mode? This doesn't actually drop the record like JSON datasource. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r212847031 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -554,18 +554,22 @@ case class JsonToStructs( @transient lazy val converter = nullableSchema match { case _: StructType => - (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null case _: ArrayType => - (rows: Seq[InternalRow]) => rows.head.getArray(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null case _: MapType => - (rows: Seq[InternalRow]) => rows.head.getMap(0) + (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null } - @transient - lazy val parser = -new JacksonParser( - nullableSchema, - new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) + @transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) + @transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) + @transient lazy val createParser = CreateJacksonParser.utf8String _ + @transient lazy val parser = new FailureSafeParser[UTF8String]( +input => rawParser.parse(input, createParser, identity[UTF8String]), +parsedOptions.parseMode, --- End diff -- I think we should keep using previous default mode `FailFastMode`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/22237 [SPARK-25243][SQL] Use FailureSafeParser in from_json ## What changes were proposed in this pull request? In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` and `DROPMALFORMED` modes as a consequence. ## How was this patch tested? It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 from_json-failuresafe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22237.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22237 commit d639cb353b8575b66700544898148e6d65595fae Author: Maxim Gekk Date: 2018-08-26T13:24:51Z Moving FailureSafeParser to catalyst commit e20615dbfafd03e44435257c57d5930e5aaef027 Author: Maxim Gekk Date: 2018-08-26T13:47:55Z Support any type by FailureSafeParser commit 5574c61fe7a369c3b549342c84b601dd98794014 Author: Maxim Gekk Date: 2018-08-26T14:56:18Z Use FailSafeParser in from_json commit 3fd9ba00a213851185b28bfa88634c43f9ed00bb Author: Maxim Gekk Date: 2018-08-26T15:02:37Z Fix tests commit 0d30d0c51fb1c09cdcedee57e6979e00d6c8 Author: Maxim Gekk Date: 2018-08-26T15:05:16Z Removing unused bind variable commit 5bd5eb39b7b6964be025aee744e948027b15a062 Author: Maxim Gekk Date: 2018-08-26T15:28:23Z Adding tests for different modes commit 0452a2ff96a6690d4e99cc0463974cc8b98c56b6 Author: Maxim Gekk Date: 2018-08-26T15:43:25Z Improve a test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org