Repository: spark
Updated Branches:
  refs/heads/master dd7816758 -> 7d0a3ef4c


[SPARK-21610][SQL][FOLLOWUP] Corrupt records are not handled properly when 
creating a dataframe from a file

## What changes were proposed in this pull request?

When the `requiredSchema` only contains `_corrupt_record`, the derived 
`actualSchema` is empty and the `_corrupt_record` are all null for all rows. 
This PR captures above situation and raise an exception with a reasonable 
workaround messag so that users can know what happened and how to fix the query.

## How was this patch tested?

Added unit test in `CSVSuite`.

Author: Jen-Ming Chung <jenmingi...@gmail.com>

Closes #19199 from jmchung/SPARK-21610-FOLLOWUP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d0a3ef4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d0a3ef4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d0a3ef4

Branch: refs/heads/master
Commit: 7d0a3ef4ced9684457ad6c5924c58b95249419e1
Parents: dd78167
Author: Jen-Ming Chung <jenmingi...@gmail.com>
Authored: Tue Sep 12 22:47:12 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Tue Sep 12 22:47:12 2017 +0900

----------------------------------------------------------------------
 .../datasources/csv/CSVFileFormat.scala         | 14 +++++++
 .../datasources/json/JsonFileFormat.scala       |  2 +-
 .../execution/datasources/csv/CSVSuite.scala    | 42 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d0a3ef4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index a99bdfe..e20977a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -109,6 +109,20 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       }
     }
 
+    if (requiredSchema.length == 1 &&
+      requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+      throw new AnalysisException(
+        "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed 
when the\n" +
+          "referenced columns only include the internal corrupt record 
column\n" +
+          s"(named _corrupt_record by default). For example:\n" +
+          
"spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()\n"
 +
+          "and 
spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().\n" +
+          "Instead, you can cache or save the parsed results and then send the 
same query.\n" +
+          "For example, val df = spark.read.schema(schema).csv(file).cache() 
and then\n" +
+          "df.filter($\"_corrupt_record\".isNotNull).count()."
+      )
+    }
+
     (file: PartitionedFile) => {
       val conf = broadcastedHadoopConf.value.value
       val parser = new UnivocityParser(

http://git-wip-us.apache.org/repos/asf/spark/blob/7d0a3ef4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index b5ed6e4..0862c74 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -118,7 +118,7 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       throw new AnalysisException(
         "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed 
when the\n" +
         "referenced columns only include the internal corrupt record column\n" 
+
-        s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For 
example:\n" +
+        s"(named _corrupt_record by default). For example:\n" +
         
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n"
 +
         "and 
spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
         "Instead, you can cache or save the parsed results and then send the 
same query.\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/7d0a3ef4/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index be89141..e439699 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1203,4 +1203,46 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
       .csv(Seq("a").toDS())
     checkAnswer(df, Row("a", null, "a"))
   }
+
+  test("SPARK-21610: Corrupt records are not handled properly when creating a 
dataframe " +
+    "from a file") {
+    val columnNameOfCorruptRecord = "_corrupt_record"
+    val schema = new StructType()
+      .add("a", IntegerType)
+      .add("b", TimestampType)
+      .add(columnNameOfCorruptRecord, StringType)
+    // negative cases
+    val msg = intercept[AnalysisException] {
+      spark
+        .read
+        .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+        .schema(schema)
+        .csv(testFile(valueMalformedFile))
+        .select(columnNameOfCorruptRecord)
+        .collect()
+    }.getMessage
+    assert(msg.contains("only include the internal corrupt record column"))
+    intercept[org.apache.spark.sql.catalyst.errors.TreeNodeException[_]] {
+      spark
+        .read
+        .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+        .schema(schema)
+        .csv(testFile(valueMalformedFile))
+        .filter($"_corrupt_record".isNotNull)
+        .count()
+    }
+    // workaround
+    val df = spark
+      .read
+      .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+      .schema(schema)
+      .csv(testFile(valueMalformedFile))
+      .cache()
+    assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
+    assert(df.filter($"_corrupt_record".isNull).count() == 1)
+    checkAnswer(
+      df.select(columnNameOfCorruptRecord),
+      Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to