Repository: spark Updated Branches: refs/heads/master f14c4ba00 -> 603f4453a
[SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names ## What changes were proposed in this pull request? When a CSV begins with: - `,,` OR - `"","",` meaning that the first column names are either empty or blank strings and `header` is specified to be `true`, then the column name is replaced with `C` + the index number of that given column. For example, if you were to read in the CSV: ``` "","second column" "hello", "there" ``` Then column names would become `"C0", "second column"`. This behavior aligns with what currently happens when `header` is specified to be `false` in recent versions of Spark. ### Current Behavior in Spark <=1.6 In Spark <=1.6, a CSV with a blank column name becomes a blank string, `""`, meaning that this column cannot be accessed. However the CSV reads in without issue. ### Current Behavior in Spark 2.0 Spark throws a NullPointerError and will not read in the file. #### Reproduction in 2.0 https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2828750690305044/484361/latest.html ## How was this patch tested? A new test was added to `CSVSuite` to account for this issue. We then have asserts that test for being able to select both the empty column names as well as the regular column names. Author: Bill Chambers <b...@databricks.com> Author: Bill Chambers <wchamb...@ischool.berkeley.edu> Closes #13041 from anabranch/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/603f4453 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/603f4453 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/603f4453 Branch: refs/heads/master Commit: 603f4453a16825cc5773cfe24d6ae4cee5ec949a Parents: f14c4ba Author: Bill Chambers <b...@databricks.com> Authored: Wed May 11 17:42:13 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed May 11 17:42:13 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 2 +- .../execution/datasources/csv/DefaultSource.scala | 6 ++++-- .../src/test/resources/cars-blank-column-name.csv | 3 +++ .../sql/execution/datasources/csv/CSVSuite.scala | 16 ++++++++++++++-- 4 files changed, 22 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7fd7583..5cb1860 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -358,7 +358,7 @@ class DataFrameReader(object): >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes - [('C0', 'string'), ('C1', 'string')] + [('_c0', 'string'), ('_c1', 'string')] """ if schema is not None: self.schema(schema) http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 948fac0..f47ed76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -61,9 +61,11 @@ class DefaultSource extends FileFormat with DataSourceRegister { val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) val header = if (csvOptions.headerFlag) { - firstRow + firstRow.zipWithIndex.map { case (value, index) => + if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value + } } else { - firstRow.zipWithIndex.map { case (value, index) => s"C$index" } + firstRow.zipWithIndex.map { case (value, index) => s"_c$index" } } val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/test/resources/cars-blank-column-name.csv ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/cars-blank-column-name.csv b/sql/core/src/test/resources/cars-blank-column-name.csv new file mode 100644 index 0000000..0b804b1 --- /dev/null +++ b/sql/core/src/test/resources/cars-blank-column-name.csv @@ -0,0 +1,3 @@ +"",,make,customer,comment +2012,"Tesla","S","bill","blank" +2013,"Tesla","S","c","something" http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b6cdc8c..ae91e0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -38,6 +38,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val carsAltFile = "cars-alternative.csv" private val carsUnbalancedQuotesFile = "cars-unbalanced-quotes.csv" private val carsNullFile = "cars-null.csv" + private val carsBlankColName = "cars-blank-column-name.csv" private val emptyFile = "empty.csv" private val commentsFile = "comments.csv" private val disableCommentsFile = "disable_comments.csv" @@ -71,14 +72,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { if (withHeader) { assert(df.schema.fieldNames === Array("year", "make", "model", "comment", "blank")) } else { - assert(df.schema.fieldNames === Array("C0", "C1", "C2", "C3", "C4")) + assert(df.schema.fieldNames === Array("_c0", "_c1", "_c2", "_c3", "_c4")) } } if (checkValues) { val yearValues = List("2012", "1997", "2015") val actualYears = if (!withHeader) "year" :: yearValues else yearValues - val years = if (withHeader) df.select("year").collect() else df.select("C0").collect() + val years = if (withHeader) df.select("year").collect() else df.select("_c0").collect() years.zipWithIndex.foreach { case (year, index) => if (checkTypes) { @@ -224,6 +225,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(cars.select("year").collect().size === 2) } + test("test for blank column names on read and select columns") { + val cars = spark.read + .format("csv") + .options(Map("header" -> "true", "inferSchema" -> "true")) + .load(testFile(carsBlankColName)) + + assert(cars.select("customer").collect().size == 2) + assert(cars.select("_c0").collect().size == 2) + assert(cars.select("_c1").collect().size == 2) + } + test("test for FAILFAST parsing mode") { val exception = intercept[SparkException]{ spark.read --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org