[jira] [Commented] (SPARK-22248) spark marks all columns as null when its unable to parse single column

2017-10-12 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202116#comment-16202116
 ] 

Hyukjin Kwon commented on SPARK-22248:
--

I think either way breaks existing support. JSON was started from Spark 1.4.0 
and CSV was started from the third party. We should match them but not sure 
which one should be treated as a bug to fix. And, yea, I guess it'd need a 
careful look. I'd be more complex if we support SPARK-20990 BTW.

> spark marks all columns as null when its unable to parse single column
> --
>
> Key: SPARK-22248
> URL: https://issues.apache.org/jira/browse/SPARK-22248
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the 
> schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val sparkschema : StructType = {
>   StructType(StructField("name", StringType) :: StructField("count", 
> LongType) :: Nil)
> }
>   val rdd = sc.parallelize(List(
> """
>   |{"name": "foo", "count": 24.0}}
>   |""".stripMargin,
> """
>   |{"name": "bar", "count": 24}}
>   |""".stripMargin
>   ))
>   
> sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
>   sqlContext.sql(
> """
>   | select
>   | name,count
>   | from
>   | events
> """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records 
> (sample: 
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use 
> the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from 
> `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
>   parser: JsonParser,
>   dataType: DataType): PartialFunction[JsonToken, Any] = {
> case VALUE_STRING if parser.getTextLength < 1 =>
>   // If conversion is failed, this produces `null` rather than throwing 
> exception.
>   // This will protect the mismatch of types.
>   null
> case token =>
>   // We cannot parse this token based on the given data type. So, we 
> throw a
>   // SparkSQLJsonProcessingException and this exception will be caught by
>   // `parse` method.
>   throw new SparkSQLJsonProcessingException(
> s"Failed to parse a value for data type $dataType (current token: 
> $token).")
>   }
> {code}
> this raises an exception when parsing the column and 
> {code}
> def parse(input: String): Seq[InternalRow] = {
> if (input.trim.isEmpty) {
>   Nil
> } else {
>   try {
> Utils.tryWithResource(factory.createParser(input)) { parser =>
>   parser.nextToken()
>   rootConverter.apply(parser) match {
> case null => failedRecord(input)
> case row: InternalRow => row :: Nil
> case array: ArrayData =>
>   // Here, as we support reading top level JSON arrays and take 
> every element
>   // in such an array as a row, this case is possible.
>   if (array.numElements() == 0) {
> Nil
>   } else {
> array.toArray[InternalRow](schema)
>   }
> case _ =>
>   failedRecord(input)
>   }
> }
>   } catch {
> case _: JsonProcessingException =>
>   failedRecord(input)
> case _: SparkSQLJsonProcessingException =>
>   failedRecord(input)
>   }
> }
>   }
> {code}
> marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22248) spark marks all columns as null when its unable to parse single column

2017-10-11 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201438#comment-16201438
 ] 

Takeshi Yamamuro commented on SPARK-22248:
--

yea, ok. Probably, you need to support both modes: single and multi-line ones.

> spark marks all columns as null when its unable to parse single column
> --
>
> Key: SPARK-22248
> URL: https://issues.apache.org/jira/browse/SPARK-22248
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the 
> schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val sparkschema : StructType = {
>   StructType(StructField("name", StringType) :: StructField("count", 
> LongType) :: Nil)
> }
>   val rdd = sc.parallelize(List(
> """
>   |{"name": "foo", "count": 24.0}}
>   |""".stripMargin,
> """
>   |{"name": "bar", "count": 24}}
>   |""".stripMargin
>   ))
>   
> sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
>   sqlContext.sql(
> """
>   | select
>   | name,count
>   | from
>   | events
> """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records 
> (sample: 
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use 
> the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from 
> `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
>   parser: JsonParser,
>   dataType: DataType): PartialFunction[JsonToken, Any] = {
> case VALUE_STRING if parser.getTextLength < 1 =>
>   // If conversion is failed, this produces `null` rather than throwing 
> exception.
>   // This will protect the mismatch of types.
>   null
> case token =>
>   // We cannot parse this token based on the given data type. So, we 
> throw a
>   // SparkSQLJsonProcessingException and this exception will be caught by
>   // `parse` method.
>   throw new SparkSQLJsonProcessingException(
> s"Failed to parse a value for data type $dataType (current token: 
> $token).")
>   }
> {code}
> this raises an exception when parsing the column and 
> {code}
> def parse(input: String): Seq[InternalRow] = {
> if (input.trim.isEmpty) {
>   Nil
> } else {
>   try {
> Utils.tryWithResource(factory.createParser(input)) { parser =>
>   parser.nextToken()
>   rootConverter.apply(parser) match {
> case null => failedRecord(input)
> case row: InternalRow => row :: Nil
> case array: ArrayData =>
>   // Here, as we support reading top level JSON arrays and take 
> every element
>   // in such an array as a row, this case is possible.
>   if (array.numElements() == 0) {
> Nil
>   } else {
> array.toArray[InternalRow](schema)
>   }
> case _ =>
>   failedRecord(input)
>   }
> }
>   } catch {
> case _: JsonProcessingException =>
>   failedRecord(input)
> case _: SparkSQLJsonProcessingException =>
>   failedRecord(input)
>   }
> }
>   }
> {code}
> marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22248) spark marks all columns as null when its unable to parse single column

2017-10-11 Thread Gaurav Shah (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201433#comment-16201433
 ] 

Gaurav Shah commented on SPARK-22248:
-

[~maropu] I am not sure on CSV, but on JSON we tokenize the input in spark 
layer then parse each token via Jackson. So if a token fails we can recover 
from it. Let try and push a patch.

> spark marks all columns as null when its unable to parse single column
> --
>
> Key: SPARK-22248
> URL: https://issues.apache.org/jira/browse/SPARK-22248
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the 
> schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val sparkschema : StructType = {
>   StructType(StructField("name", StringType) :: StructField("count", 
> LongType) :: Nil)
> }
>   val rdd = sc.parallelize(List(
> """
>   |{"name": "foo", "count": 24.0}}
>   |""".stripMargin,
> """
>   |{"name": "bar", "count": 24}}
>   |""".stripMargin
>   ))
>   
> sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
>   sqlContext.sql(
> """
>   | select
>   | name,count
>   | from
>   | events
> """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records 
> (sample: 
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use 
> the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from 
> `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
>   parser: JsonParser,
>   dataType: DataType): PartialFunction[JsonToken, Any] = {
> case VALUE_STRING if parser.getTextLength < 1 =>
>   // If conversion is failed, this produces `null` rather than throwing 
> exception.
>   // This will protect the mismatch of types.
>   null
> case token =>
>   // We cannot parse this token based on the given data type. So, we 
> throw a
>   // SparkSQLJsonProcessingException and this exception will be caught by
>   // `parse` method.
>   throw new SparkSQLJsonProcessingException(
> s"Failed to parse a value for data type $dataType (current token: 
> $token).")
>   }
> {code}
> this raises an exception when parsing the column and 
> {code}
> def parse(input: String): Seq[InternalRow] = {
> if (input.trim.isEmpty) {
>   Nil
> } else {
>   try {
> Utils.tryWithResource(factory.createParser(input)) { parser =>
>   parser.nextToken()
>   rootConverter.apply(parser) match {
> case null => failedRecord(input)
> case row: InternalRow => row :: Nil
> case array: ArrayData =>
>   // Here, as we support reading top level JSON arrays and take 
> every element
>   // in such an array as a row, this case is possible.
>   if (array.numElements() == 0) {
> Nil
>   } else {
> array.toArray[InternalRow](schema)
>   }
> case _ =>
>   failedRecord(input)
>   }
> }
>   } catch {
> case _: JsonProcessingException =>
>   failedRecord(input)
> case _: SparkSQLJsonProcessingException =>
>   failedRecord(input)
>   }
> }
>   }
> {code}
> marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22248) spark marks all columns as null when its unable to parse single column

2017-10-11 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201309#comment-16201309
 ] 

Takeshi Yamamuro commented on SPARK-22248:
--

Once failed, I feel it is difficult to recover the parsing even in a 
single-line mode (more difficult in a multi-line mode). CSV has the same 
behaviour. cc: [~hyukjin.kwon] 

> spark marks all columns as null when its unable to parse single column
> --
>
> Key: SPARK-22248
> URL: https://issues.apache.org/jira/browse/SPARK-22248
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the 
> schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val sparkschema : StructType = {
>   StructType(StructField("name", StringType) :: StructField("count", 
> LongType) :: Nil)
> }
>   val rdd = sc.parallelize(List(
> """
>   |{"name": "foo", "count": 24.0}}
>   |""".stripMargin,
> """
>   |{"name": "bar", "count": 24}}
>   |""".stripMargin
>   ))
>   
> sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
>   sqlContext.sql(
> """
>   | select
>   | name,count
>   | from
>   | events
> """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records 
> (sample: 
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use 
> the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from 
> `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
>   parser: JsonParser,
>   dataType: DataType): PartialFunction[JsonToken, Any] = {
> case VALUE_STRING if parser.getTextLength < 1 =>
>   // If conversion is failed, this produces `null` rather than throwing 
> exception.
>   // This will protect the mismatch of types.
>   null
> case token =>
>   // We cannot parse this token based on the given data type. So, we 
> throw a
>   // SparkSQLJsonProcessingException and this exception will be caught by
>   // `parse` method.
>   throw new SparkSQLJsonProcessingException(
> s"Failed to parse a value for data type $dataType (current token: 
> $token).")
>   }
> {code}
> this raises an exception when parsing the column and 
> {code}
> def parse(input: String): Seq[InternalRow] = {
> if (input.trim.isEmpty) {
>   Nil
> } else {
>   try {
> Utils.tryWithResource(factory.createParser(input)) { parser =>
>   parser.nextToken()
>   rootConverter.apply(parser) match {
> case null => failedRecord(input)
> case row: InternalRow => row :: Nil
> case array: ArrayData =>
>   // Here, as we support reading top level JSON arrays and take 
> every element
>   // in such an array as a row, this case is possible.
>   if (array.numElements() == 0) {
> Nil
>   } else {
> array.toArray[InternalRow](schema)
>   }
> case _ =>
>   failedRecord(input)
>   }
> }
>   } catch {
> case _: JsonProcessingException =>
>   failedRecord(input)
> case _: SparkSQLJsonProcessingException =>
>   failedRecord(input)
>   }
> }
>   }
> {code}
> marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org