Repository: spark Updated Branches: refs/heads/master dd7816758 -> 7d0a3ef4c
[SPARK-21610][SQL][FOLLOWUP] Corrupt records are not handled properly when creating a dataframe from a file ## What changes were proposed in this pull request? When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query. ## How was this patch tested? Added unit test in `CSVSuite`. Author: Jen-Ming Chung <jenmingi...@gmail.com> Closes #19199 from jmchung/SPARK-21610-FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d0a3ef4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d0a3ef4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d0a3ef4 Branch: refs/heads/master Commit: 7d0a3ef4ced9684457ad6c5924c58b95249419e1 Parents: dd78167 Author: Jen-Ming Chung <jenmingi...@gmail.com> Authored: Tue Sep 12 22:47:12 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Tue Sep 12 22:47:12 2017 +0900 ---------------------------------------------------------------------- .../datasources/csv/CSVFileFormat.scala | 14 +++++++ .../datasources/json/JsonFileFormat.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 42 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7d0a3ef4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a99bdfe..e20977a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -109,6 +109,20 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } } + if (requiredSchema.length == 1 && + requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { + throw new AnalysisException( + "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + + "referenced columns only include the internal corrupt record column\n" + + s"(named _corrupt_record by default). For example:\n" + + "spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()\n" + + "and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().\n" + + "Instead, you can cache or save the parsed results and then send the same query.\n" + + "For example, val df = spark.read.schema(schema).csv(file).cache() and then\n" + + "df.filter($\"_corrupt_record\".isNotNull).count()." + ) + } + (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value val parser = new UnivocityParser( http://git-wip-us.apache.org/repos/asf/spark/blob/7d0a3ef4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index b5ed6e4..0862c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -118,7 +118,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { throw new AnalysisException( "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + "referenced columns only include the internal corrupt record column\n" + - s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" + + s"(named _corrupt_record by default). For example:\n" + "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" + "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" + "Instead, you can cache or save the parsed results and then send the same query.\n" + http://git-wip-us.apache.org/repos/asf/spark/blob/7d0a3ef4/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index be89141..e439699 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1203,4 +1203,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .csv(Seq("a").toDS()) checkAnswer(df, Row("a", null, "a")) } + + test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " + + "from a file") { + val columnNameOfCorruptRecord = "_corrupt_record" + val schema = new StructType() + .add("a", IntegerType) + .add("b", TimestampType) + .add(columnNameOfCorruptRecord, StringType) + // negative cases + val msg = intercept[AnalysisException] { + spark + .read + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .csv(testFile(valueMalformedFile)) + .select(columnNameOfCorruptRecord) + .collect() + }.getMessage + assert(msg.contains("only include the internal corrupt record column")) + intercept[org.apache.spark.sql.catalyst.errors.TreeNodeException[_]] { + spark + .read + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .csv(testFile(valueMalformedFile)) + .filter($"_corrupt_record".isNotNull) + .count() + } + // workaround + val df = spark + .read + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .csv(testFile(valueMalformedFile)) + .cache() + assert(df.filter($"_corrupt_record".isNotNull).count() == 1) + assert(df.filter($"_corrupt_record".isNull).count() == 1) + checkAnswer( + df.select(columnNameOfCorruptRecord), + Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org