Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120027093
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it 
meets a corrupted \
    -                  record and puts the malformed string into a new field 
configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by 
user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it 
meets a corrupted \
    +                 record, and puts the malformed string into a field 
configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, 
an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` 
in an user-defined \
    +                 schema. If a schema does not have the field, it drops 
corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    @gatorsmile, I just got to my laptop.
    
    I checked when the length of tokens are more than the schema it fills the 
malformed column. with the data below:
    
    ```
    a,a
    ```
    
    (BTW, it looks respecting `spark.sql.columnNameOfCorruptRecord` ?)
    
    ```scala
    scala> spark.read.schema("a string, _corrupt_record 
string").csv("test.csv").show()
    +---+---------------+
    |  a|_corrupt_record|
    +---+---------------+
    |  a|            a,a|
    +---+---------------+
    ```
    
    ```scala
    scala> spark.conf.set("spark.sql.columnNameOfCorruptRecord", "abc")
    
    scala> spark.read.schema("a string, abc string").csv("test.csv").show()
    +---+---+
    |  a|abc|
    +---+---+
    |  a|a,a|
    +---+---+
    ```
    
    And, I found another bug (when the length is less then the schema):
    
    with data
    
    ```
    a
    a
    a
    a
    a
    ```
    
    ```scala
    scala> spark.read.schema("a string, b string, _corrupt_record 
string").csv("test.csv").show()
    ```
    
    prints ... 
    
    ```
    17/06/05 09:45:26 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
    java.lang.NullPointerException
        at 
scala.collection.immutable.StringLike$class.stripLineEnd(StringLike.scala:89)
        at scala.collection.immutable.StringOps.stripLineEnd(StringOps.scala:29)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$getCurrentInput(UnivocityParser.scala:56)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
        at 
org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:50)
        at 
org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:43)
        at 
org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:64)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:236)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:230)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    ```
    
    It looks `getCurrentInput` produces `null` as the input is all parsed.
    
    Another thing I would like to leave is, JSON produces `null` in the columns 
and put the contents in the malformed column:
    With the input:
    
    ```json
    {"a": 1, "b": "a"}
    ```
    
    ```scala
    scala> spark.read.json("test.json").show()
    +---+---+
    |  a|  b|
    +---+---+
    |  1|  a|
    +---+---+
    ```
    
    
    ```scala
    scala> spark.read.schema("a string, b int, _corrupt_record 
string").json("test.json").show()
    +----+----+------------------+
    |   a|   b|   _corrupt_record|
    +----+----+------------------+
    |null|null|{"a": 1, "b": "a"}|
    +----+----+------------------+
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to