HyukjinKwon commented on a change in pull request #24894: [SPARK-28058][DOC] 
Add a note to DROPMALFORMED mode of CSV for column pruning
URL: https://github.com/apache/spark/pull/24894#discussion_r294572885
 
 

 ##########
 File path: python/pyspark/sql/readwriter.py
 ##########
 @@ -441,7 +441,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, 
quote=None, escape=Non
                   When it meets a record having fewer tokens than the length 
of the schema, \
                   sets ``null`` to extra fields. When the record has more 
tokens than the \
                   length of the schema, it drops extra tokens.
-                * ``DROPMALFORMED`` : ignores the whole corrupted records.
+                * ``DROPMALFORMED`` : ignores the whole corrupted records. 
Note that when CSV \
+                  parser column pruning 
(``spark.sql.csv.parser.columnPruning.enabled``) is \
 
 Review comment:
   Here is what I tested to show the diff by that configuration and parse mode:
   
   PERMISSIVE:
   
   ```scala
   scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
   
   scala> spark.read.schema("fruit string, color string, price string, quantity 
string, _corrupt_record string").option("header", 
"true").csv("fruit.csv").select('fruit, 'color, '_corrupt_record).show()
   +------+------+---------------+
   | fruit| color|_corrupt_record|
   +------+------+---------------+
   | apple|   red|           null|
   |banana|yellow|           null|
   |orange|orange|           null|
   |   xxx|  null|           null|
   +------+------+---------------+
   
   
   scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "false")
   
   scala> spark.read.schema("fruit string, color string, price string, quantity 
string, _corrupt_record string").option("header", 
"true").csv("fruit.csv").select('fruit, 'color, '_corrupt_record).show()
   +------+------+---------------+
   | fruit| color|_corrupt_record|
   +------+------+---------------+
   | apple|   red|           null|
   |banana|yellow|           null|
   |orange|orange|           null|
   |   xxx|  null|            xxx|
   +------+------+---------------+
   ```
   
   DROPMALFORMED:
   
   ```scala
   scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "false")
   
   scala> spark.read.option("header", "true").option("mode", 
"DROPMALFORMED").csv("fruit.csv").select('fruit, 'color).show()
   +------+------+
   | fruit| color|
   +------+------+
   | apple|   red|
   |banana|yellow|
   |orange|orange|
   +------+------+
   
   
   scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
   
   scala> spark.read.option("header", "true").option("mode", 
"DROPMALFORMED").csv("fruit.csv").select('fruit, 'color).show()
   +------+------+
   | fruit| color|
   +------+------+
   | apple|   red|
   |banana|yellow|
   |orange|orange|
   |   xxx|  null|
   +------+------+
   ```
   
   FAILFAST:
   
   ```scala
   scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "false")
   
   scala> spark.read.option("header", "true").option("mode", 
"FAILFAST").csv("fruit.csv").select('fruit, 'color).show()
   org.apache.spark.SparkException: Malformed records are detected in record 
parsing. Parse Mode: FAILFAST. To process malformed records as null result, try 
setting the option 'mode' as 'PERMISSIVE'.
        at 
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:347)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionReaderFromIterator.next(PartitionReaderFromIterator.scala:26)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitionValues.next(PartitionReaderWithPartitionValues.scala:48)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionedFileReader.next(FilePartitionReaderFactory.scala:55)
        at 
org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.next(FilePartitionReader.scala:70)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:62)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:292)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:852)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:852)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:125)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: 
java.lang.RuntimeException: Malformed CSV record
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:238)
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$doParse$1(UnivocityParser.scala:192)
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser.parse(UnivocityParser.scala:203)
        at 
org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:343)
        at 
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
        ... 29 more
   Caused by: java.lang.RuntimeException: Malformed CSV record
        ... 34 more
   ...
   
   scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
   
   scala> spark.read.option("header", "true").option("mode", 
"FAILFAST").csv("fruit.csv").select('fruit, 'color).show()
   +------+------+
   | fruit| color|
   +------+------+
   | apple|   red|
   |banana|yellow|
   |orange|orange|
   |   xxx|  null|
   +------+------+
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to