Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/16928#discussion_r120027093
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -191,10 +191,13 @@ def json(self, path, schema=None,
primitivesAsString=None, prefersDecimal=None,
:param mode: allows a mode for dealing with corrupt records during
parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it
meets a corrupted \
- record and puts the malformed string into a new field
configured by \
- ``columnNameOfCorruptRecord``. When a schema is set by
user, it sets \
- ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it
meets a corrupted \
+ record, and puts the malformed string into a field
configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records,
an user can set \
+ a string type field named ``columnNameOfCorruptRecord``
in an user-defined \
+ schema. If a schema does not have the field, it drops
corrupt records during \
+ parsing. When inferring a schema, it implicitly adds a \
+ ``columnNameOfCorruptRecord`` field in an output schema.
--- End diff --
@gatorsmile, I just got to my laptop.
I checked when the length of tokens are more than the schema it fills the
malformed column. with the data below:
```
a,a
```
(BTW, it looks respecting `spark.sql.columnNameOfCorruptRecord` ?)
```scala
scala> spark.read.schema("a string, _corrupt_record
string").csv("test.csv").show()
+---+---------------+
| a|_corrupt_record|
+---+---------------+
| a| a,a|
+---+---------------+
```
```scala
scala> spark.conf.set("spark.sql.columnNameOfCorruptRecord", "abc")
scala> spark.read.schema("a string, abc string").csv("test.csv").show()
+---+---+
| a|abc|
+---+---+
| a|a,a|
+---+---+
```
And, I found another bug (when the length is less then the schema):
with data
```
a
a
a
a
a
```
```scala
scala> spark.read.schema("a string, b string, _corrupt_record
string").csv("test.csv").show()
```
prints ...
```
17/06/05 09:45:26 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.NullPointerException
at
scala.collection.immutable.StringLike$class.stripLineEnd(StringLike.scala:89)
at scala.collection.immutable.StringOps.stripLineEnd(StringOps.scala:29)
at
org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$getCurrentInput(UnivocityParser.scala:56)
at
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
at
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
at
org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:50)
at
org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:43)
at
org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:64)
at
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
at
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:236)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:230)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
It looks `getCurrentInput` produces `null` as the input is all parsed.
Another thing I would like to leave is, JSON produces `null` in the columns
and put the contents in the malformed column:
With the input:
```json
{"a": 1, "b": "a"}
```
```scala
scala> spark.read.json("test.json").show()
+---+---+
| a| b|
+---+---+
| 1| a|
+---+---+
```
```scala
scala> spark.read.schema("a string, b int, _corrupt_record
string").json("test.json").show()
+----+----+------------------+
| a| b| _corrupt_record|
+----+----+------------------+
|null|null|{"a": 1, "b": "a"}|
+----+----+------------------+
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]