HyukjinKwon commented on a change in pull request #24894: [SPARK-28058][DOC]
Add a note to DROPMALFORMED mode of CSV for column pruning
URL: https://github.com/apache/spark/pull/24894#discussion_r294572885
##########
File path: python/pyspark/sql/readwriter.py
##########
@@ -441,7 +441,12 @@ def csv(self, path, schema=None, sep=None, encoding=None,
quote=None, escape=Non
When it meets a record having fewer tokens than the length
of the schema, \
sets ``null`` to extra fields. When the record has more
tokens than the \
length of the schema, it drops extra tokens.
- * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
Note that when CSV \
+ parser column pruning
(``spark.sql.csv.parser.columnPruning.enabled``) is \
Review comment:
Here is what I tested to show the diff by that configuration and parse mode:
PERMISSIVE:
```scala
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
scala> spark.read.schema("fruit string, color string, price string, quantity
string, _corrupt_record string").option("header",
"true").csv("fruit.csv").select('fruit, 'color, '_corrupt_record).show()
+------+------+---------------+
| fruit| color|_corrupt_record|
+------+------+---------------+
| apple| red| null|
|banana|yellow| null|
|orange|orange| null|
| xxx| null| null|
+------+------+---------------+
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "false")
scala> spark.read.schema("fruit string, color string, price string, quantity
string, _corrupt_record string").option("header",
"true").csv("fruit.csv").select('fruit, 'color, '_corrupt_record).show()
+------+------+---------------+
| fruit| color|_corrupt_record|
+------+------+---------------+
| apple| red| null|
|banana|yellow| null|
|orange|orange| null|
| xxx| null| xxx|
+------+------+---------------+
```
DROPMALFORMED:
```scala
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "false")
scala> spark.read.option("header", "true").option("mode",
"DROPMALFORMED").csv("fruit.csv").select('fruit, 'color).show()
+------+------+
| fruit| color|
+------+------+
| apple| red|
|banana|yellow|
|orange|orange|
+------+------+
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
scala> spark.read.option("header", "true").option("mode",
"DROPMALFORMED").csv("fruit.csv").select('fruit, 'color).show()
+------+------+
| fruit| color|
+------+------+
| apple| red|
|banana|yellow|
|orange|orange|
| xxx| null|
+------+------+
```
FAILFAST:
```scala
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "false")
scala> spark.read.option("header", "true").option("mode",
"FAILFAST").csv("fruit.csv").select('fruit, 'color).show()
org.apache.spark.SparkException: Malformed records are detected in record
parsing. Parse Mode: FAILFAST. To process malformed records as null result, try
setting the option 'mode' as 'PERMISSIVE'.
at
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
at
org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:347)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at
org.apache.spark.sql.execution.datasources.v2.PartitionReaderFromIterator.next(PartitionReaderFromIterator.scala:26)
at
org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitionValues.next(PartitionReaderWithPartitionValues.scala:48)
at
org.apache.spark.sql.execution.datasources.v2.PartitionedFileReader.next(FilePartitionReaderFactory.scala:55)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.next(FilePartitionReader.scala:70)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:62)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:292)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:852)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:852)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:125)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException:
java.lang.RuntimeException: Malformed CSV record
at
org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:238)
at
org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$doParse$1(UnivocityParser.scala:192)
at
org.apache.spark.sql.catalyst.csv.UnivocityParser.parse(UnivocityParser.scala:203)
at
org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:343)
at
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
... 29 more
Caused by: java.lang.RuntimeException: Malformed CSV record
... 34 more
...
scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", "true")
scala> spark.read.option("header", "true").option("mode",
"FAILFAST").csv("fruit.csv").select('fruit, 'color).show()
+------+------+
| fruit| color|
+------+------+
| apple| red|
|banana|yellow|
|orange|orange|
| xxx| null|
+------+------+
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]