[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-11-19 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r234792965
  
--- Diff: R/pkg/tests/fulltests/test_sparkSQL.R ---
@@ -1694,7 +1694,7 @@ test_that("column functions", {
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
   s <- collect(select(df, from_json(df$col, schema2)))
-  expect_equal(s[[1]][[1]], NA)
+  expect_equal(s[[1]][[1]]$date, NA)
--- End diff --

Do you mean this particular line or in general?

This line was changed because in the `PERMISSIVE` mode we usually return a 
`Row` with null fields that we wasn't able to parse instead of just `null` for 
whole row.

In general, to fully support the `PERMISSIVE` mode without any excuses when 
uniVocity parser cannot detect any JSON tokens on root level. We switched to 
`FailureSafeParser` in `from_json` and `PERMISSIVE` as the default mode, 
recently there #22237. Previously `from_json` didn't support any modes 
comparing to JSON datasource. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-11-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r234789275
  
--- Diff: R/pkg/tests/fulltests/test_sparkSQL.R ---
@@ -1694,7 +1694,7 @@ test_that("column functions", {
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
   s <- collect(select(df, from_json(df$col, schema2)))
-  expect_equal(s[[1]][[1]], NA)
+  expect_equal(s[[1]][[1]]$date, NA)
--- End diff --

What is the reason we made this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22237


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r224069683
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
--- End diff --

It's not about the real business logic, but an idea to make the code 
simpler.
```
val parserSchema = nullableSchema match {
  case s: StructType => s
  case other => new StructType("value", other)
}
new FailureSafeParser[UTF8String](
  input => rawParser.parse(input, createParser, identity[UTF8String]),
  mode,
  parserSchema,
  parsedOptions.columnNameOfCorruptRecord,
  parsedOptions.multiLine)
```

Then we don't need to change `FailureSafeParser`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-10 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r224044435
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
--- End diff --

Frankly speaking I don't fully understand the idea. Let's look at an 
example. We should parser JSON arrays (one array per row) like:
```
[1, 2, 3]
[4, 5]
```
and an user provided the schema `ArrayType(IntegerType, true)`. So, you 
propose to wrap the array by `StructType(Seq(StructField(ArrayType(IntegerType, 
...`, right? And use the code inside of `JacksonParser` which we disabled 
by `allowArrayAsStructs` for now? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-10 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r224042803
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

For now the case is more concrete, we return null if `Jackson` parser 
doesn't find any token in the input. Not sure, this detailed info about 
underlying problem can help users much more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-10 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r224042251
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

> we don't have to do it in this PR, but it would be great to document when 
this expression will return null ...

We already state in the docs for `from_json()`:  `Returns null, in the case 
of an unparseable string.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223912044
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 ---
@@ -450,21 +456,24 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("from_json - input=array of single object, schema=struct, 
output=single row") {
 val input = """[{"a": 1}]"""
 val schema = StructType(StructField("a", IntegerType) :: Nil)
-val output = InternalRow(1)
+val output = InternalRow(null)
 checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), 
gmtId), output)
   }
 
-  test("from_json - input=array, schema=struct, output=null") {
+  test("from_json - input=array, schema=struct, output=single row") {
 val input = """[{"a": 1}, {"a": 2}]"""
-val schema = StructType(StructField("a", IntegerType) :: Nil)
-val output = null
-checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), 
gmtId), output)
+val corrupted = "corrupted"
+val schema = new StructType().add("a", IntegerType).add(corrupted, 
StringType)
+StructType(StructField("a", IntegerType) :: Nil)
+val output = InternalRow(null, UTF8String.fromString(input))
+val options = Map("columnNameOfCorruptRecord" -> corrupted)
+checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=empty array, schema=struct, output=null") {
--- End diff --

`output=single row`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223911893
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 ---
@@ -450,21 +456,24 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("from_json - input=array of single object, schema=struct, 
output=single row") {
 val input = """[{"a": 1}]"""
 val schema = StructType(StructField("a", IntegerType) :: Nil)
-val output = InternalRow(1)
+val output = InternalRow(null)
 checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), 
gmtId), output)
   }
 
-  test("from_json - input=array, schema=struct, output=null") {
+  test("from_json - input=array, schema=struct, output=single row") {
 val input = """[{"a": 1}, {"a": 2}]"""
-val schema = StructType(StructField("a", IntegerType) :: Nil)
-val output = null
-checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), 
gmtId), output)
+val corrupted = "corrupted"
+val schema = new StructType().add("a", IntegerType).add(corrupted, 
StringType)
+StructType(StructField("a", IntegerType) :: Nil)
--- End diff --

unnecessary line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223911795
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

... and with some tests to verify it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223911576
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

we don't have to do this PR, but it would be great to document when this 
expression will return null, in the class doc of `JsonToStructs`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223852220
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
--- End diff --

updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223849217
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

Actually I am wrong we return empty iterator and as the consequence of that 
null in the case if there is no input tokens, there 
https://github.com/apache/spark/blob/a8a1ac01c4732f8a738b973c8486514cd88bf99b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala#L398

We can throw `BadRecordException` instead of Nil but this will change 
behavior of JSON/CSV datasources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223836030
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

We shouldn't return `null`. I will replace the `null` by exceptions like I 
did for `from_csv`: 
https://github.com/apache/spark/pull/22379/files#diff-5321c01e95bffc4413c5f3457696213eR83


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223833101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -90,6 +91,10 @@ class JacksonParser(
 // in such an array as a row, this case is possible.
 if (array.numElements() == 0) {
   Nil
+} else if (array.numElements() > 1 && !explodeArray) {
--- End diff --

> ... but I think it's ok to drop it in from_json
> ... the new parameter can be named as allowArrayAsStructs

Is it ok if I'll do:

```
case START_ARRAY if allowArrayAsStructs =>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223708500
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
--- End diff --

so we may still return null in some cases, can you list them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223707899
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
--- End diff --

instead of changing this file, can we update `JsonToStructs.parser`, to 
wrap array or map with a struct before creating `FailureSafeParser`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223703894
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -90,6 +91,10 @@ class JacksonParser(
 // in such an array as a row, this case is possible.
 if (array.numElements() == 0) {
   Nil
+} else if (array.numElements() > 1 && !explodeArray) {
--- End diff --

the new parameter can be named as `allowArrayAsStructs`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223703462
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -90,6 +91,10 @@ class JacksonParser(
 // in such an array as a row, this case is possible.
 if (array.numElements() == 0) {
   Nil
+} else if (array.numElements() > 1 && !explodeArray) {
--- End diff --

shall we forbid array as struct type completely for `from_json`?

BTW I think this feature is not needed now, since we can directly read 
top-level json arrays as array type. We may still need to keep it when reading 
json files, but I think it's ok to drop it in `from_json`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223700226
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
--- End diff --

shall we update the guide too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223696096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -547,4 +548,27 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
   Map("pretty" -> "true"))),
   Seq(Row(expected)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
--- End diff --

@MaxGekk, can we add a test with manual malformed column in the schema? For 
instance, 


https://github.com/apache/spark/blob/a8a1ac01c4732f8a738b973c8486514cd88bf99b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala#L1125-L1133


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223675223
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

Introduced `explodeArray` for `JacksonParser`, and modified a test to check 
the situation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223550790
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

Last option sounds better to me but can we fill the corrupt row when the 
corrupt field name is specified in the schema? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223545341
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

the last option LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-08 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223459186
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

This is the case when an user provided `StructType` schema but we observe 
an `array` in JSON input. So, JSON datasource returns row per each struct in 
the array. Currently `from_json` returns `null` in the case. With this PR, 
`from_json` returns one row with the first element of input array.

Because of we cannot return multiple rows from a functions, so we have the 
following options:
- return `null` but this will be the first case when for not `null` input 
we return `null` (this current approach before the PR)
- return a row with one element from the input array (this PR proposes that)
- throw an exception which is not nice option in the `PERMISSIVE` mode
- throw `BadRecordException` internally, and return `Row(null, null, ..., 
null)` in the `PERMISSIVE` mode or an exception in `FAILFAST`.

It seems the last option is more attractive than other. WDYT?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-08 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223263164
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

I will check that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223254210
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

> Since Spark 3.0, the input is considered as a valid JSON array and only 
its first element is parsed if it conforms to the specified `StructType`.

Is this behavior same with reading json files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r223144685
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1879,6 +1879,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 2.5
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

Reverted back to 3.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222971397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+dataType: DataType,
 columnNameOfCorruptRecord: String,
 isMultiLine: Boolean) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
   // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
   // row with all fields null. If the given schema contains a field for 
corrupted record, we will
   // set the bad record to this field, and set other fields according to 
the partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
-if (corruptFieldIndex.isDefined) {
-  (row, badRecord) => {
-var i = 0
-while (i < actualSchema.length) {
-  val from = actualSchema(i)
-  resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, 
from.dataType)).orNull
-  i += 1
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = dataType match {
+case struct: StructType =>
+  val corruptFieldIndex = 
struct.getFieldIndex(columnNameOfCorruptRecord)
--- End diff --

Do you mind if I ask to make a private function for this one (L38 to L54) 
and define this around here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222942666
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

ok. I will revert the last changes back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222926650
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

I suggested to keep the previous behaviour only because I thought we're in 
transition to 2.5. Since we are going ahead for 3.0, I think I am okay without 
keeping it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222924928
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

After looking at it again, the `mode` option is already there, but 
`from_json` ignores it.

Now this PR looks like a bug fix to me. I'm wondering if we do need to keep 
the previous behavior. But if we do, I think using a SQL config is more 
reasonable than adding a new mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222814860
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

A separate boolean flag to JSONOptions or SQL config?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222812531
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

I would add it to `from_json` but the function shares the same common 
classes as JSON data source, and the mode is used in the common classes. Any 
ideas how to restrict the mode value to `from_json` only?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222811744
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -550,59 +550,93 @@ case class JsonToStructs(
   s"Input schema ${nullableSchema.catalogString} must be a struct, an 
array or a map.")
   }
 
-  // This converts parsed rows to the desired output by the given schema.
-  @transient
-  lazy val converter = nullableSchema match {
-case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
-case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
-case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  abstract class RowParser {
--- End diff --

Sure, I just wasn't going to mix it to other classes, and sub-classes 
should extend only `RowParser`.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222653808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala 
---
@@ -51,6 +56,8 @@ object ParseMode extends Logging {
 case PermissiveMode.name => PermissiveMode
 case DropMalformedMode.name => DropMalformedMode
 case FailFastMode.name => FailFastMode
+case NullMalformedMode.name => NullMalformedMode
--- End diff --

I think we should only add the new mode for `from_json`, not the entire 
json data source?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222651484
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -550,59 +550,93 @@ case class JsonToStructs(
   s"Input schema ${nullableSchema.catalogString} must be a struct, an 
array or a map.")
   }
 
-  // This converts parsed rows to the desired output by the given schema.
-  @transient
-  lazy val converter = nullableSchema match {
-case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
-case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
-case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  abstract class RowParser {
--- End diff --

can it be `trait`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222651368
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1890,6 +1890,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` function supports three modes - 
`PERMISSIVE`, `FAILFAST` and `NULLMALFORMED`. The modes can be set via the 
`mode` option. `PERMISSIVE` became the default mode. In previous versions, 
behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, 
especially in processing of malformed JSON records. For example, the JSON 
string `{"a" 1}` with the schema `a INT` is converted to `null` by previous 
versions but Spark 3.0 converts it to `Row(null)`. In version 2.4 and earlier, 
arrays of JSON objects are considered as invalid and converted to `null` if 
specified schema is `StructType`. Since Spark 3.0, the input is considered as a 
valid JSON array and only its first element is parsed if it conforms to the 
specified `StructType`. To restore the previous behavior, set the JSON option 
`mode` to `NULLMALFORMED`.
--- End diff --

how about `PERMISSIVE`, `FAILFAST` and `LEGACY`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-10-03 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r222554954
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
+
--- End diff --

> Do we have a clear definition of the current behavior? It's important to 
let user know how the behavior changes.

I added new mode in which behavior is the same as current one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220908825
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+dataType: DataType,
 columnNameOfCorruptRecord: String,
 isMultiLine: Boolean) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
   // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
   // row with all fields null. If the given schema contains a field for 
corrupted record, we will
   // set the bad record to this field, and set other fields according to 
the partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
--- End diff --

Just to make the review easier, backporting easier, and keep the original 
author of the codes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-27 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220905353
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+dataType: DataType,
 columnNameOfCorruptRecord: String,
 isMultiLine: Boolean) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
   // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
   // row with all fields null. If the given schema contains a field for 
corrupted record, we will
   // set the bad record to this field, and set other fields according to 
the partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
--- End diff --

> Can we make the diff small?

just wondering what is the reason to make the diff small?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220793666
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
--- End diff --

ah i see. If the `mode` option already exist, let's keep it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220789449
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+dataType: DataType,
 columnNameOfCorruptRecord: String,
 isMultiLine: Boolean) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
   // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
   // row with all fields null. If the given schema contains a field for 
corrupted record, we will
   // set the bad record to this field, and set other fields according to 
the partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
--- End diff --

Looks we keep the original code and add one case for non struct type from a 
cursory look. Can we make the diff small?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220788994
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1879,6 +1879,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 2.5
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
--- End diff --

`Since Spark 3.0` -> `Since Spark 2.5` :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220668461
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
--- End diff --

The `JSONOptions` is shared among build-in json functions like `from_json` 
and JSON datasource. And the formal one use 3 modes - `FAILFAST`, 
`DROPMALFORMED` and `PERMISSIVE`. I am not sure how the `mode` mode can be 
replaced. The approach that I could image is to inherit from `JSONOptions` and 
add new `val`. The `mode` itself cannot be removed because it is used in 
`FailureSafeParser` for example, in particular `DropMalformedMode` is handled 
explicitly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220658815
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
--- End diff --

yea, instead the "mode" option.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220533794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
--- End diff --

Do you mean by introducing new val `failFast` to JSONOptions? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-26 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220486164
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
+
--- End diff --

We have only particular cases when existing tests are failing in the 
`PERMISSIVE` mode. I tried to describe the cases here in the migration guide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220400809
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
+
--- End diff --

> In version 2.4 and earlier, arrays of JSON objects are considered as 
invalid and converted to `null` if specified schema is `StructType`. Since 
Spark 3.0, the input is considered as a valid JSON array and only its first 
element is parsed if it conforms to the specified `StructType`.

Why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220400637
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
--- End diff --

2.5


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220401132
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`. In version 2.4 and earlier, arrays of JSON objects 
are considered as invalid and converted to `null` if specified schema is 
`StructType`. Since Spark 3.0, the input is considered as a valid JSON array 
and only its first element is parsed if it conforms to the specified 
`StructType`.
+
--- End diff --

> In previous versions, behavior of `from_json` did not conform to either 
`PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records.

Do we have a clear definition of the current behavior? It's important to 
let user know how the behavior changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r220401206
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,30 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+if (mode != PermissiveMode && mode != FailFastMode) {
--- End diff --

since we only support 2 modes, how about we use a boolean option? e.g. 
`failFast`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-18 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218585584
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 ---
@@ -450,7 +450,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("from_json - input=array, schema=struct, output=null") {
 val input = """[{"a": 1}, {"a": 2}]"""
 val schema = StructType(StructField("a", IntegerType) :: Nil)
-val output = null
+val output = InternalRow(1)
 checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), 
gmtId), output)
--- End diff --

I updated the migration guide and test's title


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-18 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218584877
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records.
--- End diff --

Sure, I added a couple sentences. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-18 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218577348
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -595,10 +607,7 @@ case class JsonToStructs(
 if (json.toString.trim.isEmpty) return null
 
 try {
-  converter(parser.parse(
-json.asInstanceOf[UTF8String],
-CreateJacksonParser.utf8String,
-identity[UTF8String]))
+  converter(parser.parse(json.asInstanceOf[UTF8String]))
 } catch {
   case _: BadRecordException => null
--- End diff --

`BadRecordException` shouldn't propagate here any more. I will remove the 
exception handler and re-run tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-18 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218574370
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 ---
@@ -402,13 +402,13 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
 val schema = StructType(StructField("a", IntegerType) :: Nil)
 checkEvaluation(
   JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
-  null
+  InternalRow(null)
 )
 
 // Other modes should still return `null`.
 checkEvaluation(
   JsonToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(jsonData), gmtId),
--- End diff --

Changed to `FailFastMode.name`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r217995920
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 ---
@@ -402,13 +402,13 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
 val schema = StructType(StructField("a", IntegerType) :: Nil)
 checkEvaluation(
   JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
-  null
+  InternalRow(null)
 )
 
 // Other modes should still return `null`.
 checkEvaluation(
   JsonToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(jsonData), gmtId),
--- End diff --

The default mode is `PermissiveMode` now, so this looks necessary to change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218000227
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1877,6 +1877,10 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.4 to 3.0
+
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records.
--- End diff --

Can we briefly explain the difference between updated behavior and previous 
one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218000572
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -595,10 +607,7 @@ case class JsonToStructs(
 if (json.toString.trim.isEmpty) return null
 
 try {
-  converter(parser.parse(
-json.asInstanceOf[UTF8String],
-CreateJacksonParser.utf8String,
-identity[UTF8String]))
+  converter(parser.parse(json.asInstanceOf[UTF8String]))
 } catch {
   case _: BadRecordException => null
--- End diff --

Do we still need this catching of `BadRecordException`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r218003046
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 ---
@@ -450,7 +450,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("from_json - input=array, schema=struct, output=null") {
 val input = """[{"a": 1}, {"a": 2}]"""
 val schema = StructType(StructField("a", IntegerType) :: Nil)
-val output = null
+val output = InternalRow(1)
 checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), 
gmtId), output)
--- End diff --

This looks a behavior change too. Shall we also note this in migration 
guide?

The test name is not correct for now too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-09-10 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r21650
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,26 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exception1 = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+}.getMessage
+assert(exception1.contains(
+  "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+val exception2 = intercept[AnalysisException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"DROPMALFORMED"))).collect()
--- End diff --

I replaced it by `AnalysisException` but I think it is wrong decision. 
Throwing of `AnalysisException` at run-time looks ugly:
```
Caused by: org.apache.spark.sql.AnalysisException: from_json() doesn't 
support the DROPMALFORMED mode. Acceptable modes are PERMISSIVE and FAILFAST.;
at 
org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser$lzycompute(jsonExpressions.scala:568)
at 
org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser(jsonExpressions.scala:564)
...
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
I am going to replace it by something else or revert back to 
`IllegalArgumentException`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r214302343
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
   - In version 2.3 and earlier, CSV rows are considered as malformed if at 
least one column value in the row is malformed. CSV parser dropped such rows in 
the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 
2.4, CSV row is considered as malformed only when it contains malformed column 
values requested from CSV datasource, other values can be ignored. As an 
example, CSV file contains the "id,name" header and one row "1234". In Spark 
2.4, selection of the id column consists of a row with one column value 1234 
but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore 
the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to 
`false`.
   - Since Spark 2.4, File listing for compute statistics is done in 
parallel by default. This can be disabled by setting 
`spark.sql.parallelFileListingInStatsComputation.enabled` to `False`.
   - Since Spark 2.4, Metadata files (e.g. Parquet summary files) and 
temporary files are not counted as data files when calculating table size 
during Statistics computation.
+  - Since Spark 2.4, the from_json functions supports two modes - 
PERMISSIVE and FAILFAST. The modes can be set via the `mode` option. The 
default mode became PERMISSIVE. In previous versions, behavior of from_json did 
not conform to either PERMISSIVE nor FAILFAST, especially in processing of 
malformed JSON records.
--- End diff --

nit: from_json -> `` `from_json` ``.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-30 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r214218549
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,26 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exception1 = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+}.getMessage
+assert(exception1.contains(
+  "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+val exception2 = intercept[AnalysisException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"DROPMALFORMED"))).collect()
--- End diff --

Can you fix the code to throw an analysis exception in analysis phases 
instead of execution phases (`.collect()` called)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-30 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r213980110
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,28 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+require(mode == PermissiveMode || mode == FailFastMode,
--- End diff --

ok, thansk!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-30 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r213975610
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,28 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+require(mode == PermissiveMode || mode == FailFastMode,
--- End diff --

I didn't put `require` to the constructor body directly because of 
`timeZoneId`. If I move the checking up, I need to move `val parsedOptions = 
new JSONOptions(options, timeZoneId.get)` too (lazy or not lazy). Checking will 
force getting of `timeZoneId.get` which will raise an exception. I will check 
this today or tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-29 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r213886899
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+schema: DataType,
--- End diff --

`schema` -> `dataType`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-29 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r213885777
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,28 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+require(mode == PermissiveMode || mode == FailFastMode,
--- End diff --

Also, can we use `AnalysisException` instead of `require`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-29 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r213885328
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,28 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parser = {
+val parsedOptions = new JSONOptions(options, timeZoneId.get)
+val mode = parsedOptions.parseMode
+require(mode == PermissiveMode || mode == FailFastMode,
--- End diff --

I think we should move this verification into the constructor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r213003614
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exceptionOne = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+}.getMessage
+assert(exceptionOne.contains(
+  "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> 
"DROPMALFORMED"))),
+  Row(null) :: Row(Row(2)) :: Nil)
--- End diff --

Nope, only possibility I raised was to make it generator expression. I 
haven't proposed a parse mode for this reason so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212928747
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,22 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parsedOptions = new JSONOptions(options, 
timeZoneId.get)
+  @transient lazy val rawParser = new JacksonParser(nullableSchema, 
parsedOptions)
+  @transient lazy val createParser = CreateJacksonParser.utf8String _
+  @transient lazy val parser = new FailureSafeParser[UTF8String](
+input => rawParser.parse(input, createParser, identity[UTF8String]),
+parsedOptions.parseMode,
--- End diff --

It is not handled by `JacksonParser`, and the behavior is somehow similar 
to `PermissiveMode` as @HyukjinKwon pointed out at 
https://github.com/apache/spark/pull/22237/files#r212850156, but not exactly 
the same. 

Seems now the `PermissiveMode` on `FailureSafeParser` has different result 
on corrupted records. I noticed that some existing tests maybe changed due to 
that.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212925256
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exceptionOne = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
--- End diff --

Behavior of `JsonToStructs` is pretty close to `PERMISSIVE` actually. I 
have to make just a few small changes in tests that checks processing malformed 
inputs. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212924389
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exceptionOne = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+}.getMessage
+assert(exceptionOne.contains(
+  "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> 
"DROPMALFORMED"))),
+  Row(null) :: Row(Row(2)) :: Nil)
--- End diff --

The `DROPMALFORMED` mode returns `null` for malformed JSON lines. User can 
filter them out later. @HyukjinKwon Do you know how to drop rows in 
`UnaryExpression`s?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212922787
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,22 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parsedOptions = new JSONOptions(options, 
timeZoneId.get)
+  @transient lazy val rawParser = new JacksonParser(nullableSchema, 
parsedOptions)
+  @transient lazy val createParser = CreateJacksonParser.utf8String _
+  @transient lazy val parser = new FailureSafeParser[UTF8String](
+input => rawParser.parse(input, createParser, identity[UTF8String]),
+parsedOptions.parseMode,
--- End diff --

Previous settings of `FailFastMode` didn't impact on the behavior because 
the `mode` option wasn't handled at all. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212891660
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 ---
@@ -15,50 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class FailureSafeParser[IN](
 rawParser: IN => Seq[InternalRow],
 mode: ParseMode,
-schema: StructType,
+schema: DataType,
 columnNameOfCorruptRecord: String,
 isMultiLine: Boolean) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
   // This function takes 2 parameters: an optional partial result, and the 
bad record. If the given
   // schema doesn't contain a field for corrupted record, we just return 
the partial result or a
   // row with all fields null. If the given schema contains a field for 
corrupted record, we will
   // set the bad record to this field, and set other fields according to 
the partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
-if (corruptFieldIndex.isDefined) {
-  (row, badRecord) => {
-var i = 0
-while (i < actualSchema.length) {
-  val from = actualSchema(i)
-  resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, 
from.dataType)).orNull
-  i += 1
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = schema match {
+case struct: StructType =>
+  val corruptFieldIndex = 
struct.getFieldIndex(columnNameOfCorruptRecord)
+  val actualSchema = StructType(struct.filterNot(_.name == 
columnNameOfCorruptRecord))
+  val resultRow = new GenericInternalRow(struct.length)
+  val nullResult = new GenericInternalRow(struct.length)
+  if (corruptFieldIndex.isDefined) {
--- End diff --

Can we move `actualSchema` and `resultRow` into `if 
(corruptFieldIndex.isDefined) {` inside?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-27 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212887790
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,22 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parsedOptions = new JSONOptions(options, 
timeZoneId.get)
+  @transient lazy val rawParser = new JacksonParser(nullableSchema, 
parsedOptions)
--- End diff --

How about this?
```

  @transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get)
val rawParser = new JacksonParser(nullableSchema, parsedOptions)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
  input => rawParser.parse(input, createParser, identity[UTF8String]),
  parsedOptions.parseMode,
  schema,
  parsedOptions.columnNameOfCorruptRecord,
  parsedOptions.multiLine)
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212850156
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exceptionOne = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
--- End diff --

`JsonToStructs` resembles PERMISSIVE mode (from the first place) although 
their behaviours are slightly different. This is going to be different with 
PERMISSIVE and also FAILFAST modes. They are actually behaviour changes if we 
just use PERMISSIVE mode here by default (as @viirya pointed out).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212849989
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---
@@ -469,4 +470,23 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
 checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
   }
+
+  test("from_json invalid json - check modes") {
+val df = Seq("""{"a" 1}""", """{"a": 2}""").toDS()
+val schema = new StructType().add("a", IntegerType)
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+  Row(Row(null)) :: Row(Row(2)) :: Nil)
+
+val exceptionOne = intercept[SparkException] {
+  df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+}.getMessage
+assert(exceptionOne.contains(
+  "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+checkAnswer(
+  df.select(from_json($"value", schema, Map("mode" -> 
"DROPMALFORMED"))),
+  Row(null) :: Row(Row(2)) :: Nil)
--- End diff --

How does it work for DROPMALFORMED mode? This doesn't actually drop the 
record like JSON datasource.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-26 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22237#discussion_r212847031
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -554,18 +554,22 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
 case _: StructType =>
-  (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else 
null
 case _: ArrayType =>
-  (rows: Seq[InternalRow]) => rows.head.getArray(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
 case _: MapType =>
-  (rows: Seq[InternalRow]) => rows.head.getMap(0)
+  (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getMap(0) else null
   }
 
-  @transient
-  lazy val parser =
-new JacksonParser(
-  nullableSchema,
-  new JSONOptions(options + ("mode" -> FailFastMode.name), 
timeZoneId.get))
+  @transient lazy val parsedOptions = new JSONOptions(options, 
timeZoneId.get)
+  @transient lazy val rawParser = new JacksonParser(nullableSchema, 
parsedOptions)
+  @transient lazy val createParser = CreateJacksonParser.utf8String _
+  @transient lazy val parser = new FailureSafeParser[UTF8String](
+input => rawParser.parse(input, createParser, identity[UTF8String]),
+parsedOptions.parseMode,
--- End diff --

I think we should keep using previous default mode `FailFastMode`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...

2018-08-26 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/22237

[SPARK-25243][SQL] Use FailureSafeParser in from_json

## What changes were proposed in this pull request?

In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to 
make the function compatible to `PERMISSIVE` mode by default, and to support 
the `FAILFAST` and `DROPMALFORMED` modes as a consequence.

## How was this patch tested?

It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and 
`JsonExpressionsSuite` as well as new tests for `from_json` which checks 
different modes.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 from_json-failuresafe

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22237.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22237


commit d639cb353b8575b66700544898148e6d65595fae
Author: Maxim Gekk 
Date:   2018-08-26T13:24:51Z

Moving FailureSafeParser to catalyst

commit e20615dbfafd03e44435257c57d5930e5aaef027
Author: Maxim Gekk 
Date:   2018-08-26T13:47:55Z

Support any type by FailureSafeParser

commit 5574c61fe7a369c3b549342c84b601dd98794014
Author: Maxim Gekk 
Date:   2018-08-26T14:56:18Z

Use FailSafeParser in from_json

commit 3fd9ba00a213851185b28bfa88634c43f9ed00bb
Author: Maxim Gekk 
Date:   2018-08-26T15:02:37Z

Fix tests

commit 0d30d0c51fb1c09cdcedee57e6979e00d6c8
Author: Maxim Gekk 
Date:   2018-08-26T15:05:16Z

Removing unused bind variable

commit 5bd5eb39b7b6964be025aee744e948027b15a062
Author: Maxim Gekk 
Date:   2018-08-26T15:28:23Z

Adding tests for different modes

commit 0452a2ff96a6690d4e99cc0463974cc8b98c56b6
Author: Maxim Gekk 
Date:   2018-08-26T15:43:25Z

Improve a test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org