Repository: spark
Updated Branches:
  refs/heads/master f14c4ba00 -> 603f4453a


[SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names

## What changes were proposed in this pull request?

When a CSV begins with:
- `,,`
OR
- `"","",`

meaning that the first column names are either empty or blank strings and 
`header` is specified to be `true`, then the column name is replaced with `C` + 
the index number of that given column. For example, if you were to read in the 
CSV:
```
"","second column"
"hello", "there"
```
Then column names would become `"C0", "second column"`.

This behavior aligns with what currently happens when `header` is specified to 
be `false` in recent versions of Spark.

### Current Behavior in Spark <=1.6
In Spark <=1.6, a CSV with a blank column name becomes a blank string, `""`, 
meaning that this column cannot be accessed. However the CSV reads in without 
issue.

### Current Behavior in Spark 2.0
Spark throws a NullPointerError and will not read in the file.

#### Reproduction in 2.0
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2828750690305044/484361/latest.html

## How was this patch tested?
A new test was added to `CSVSuite` to account for this issue. We then have 
asserts that test for being able to select both the empty column names as well 
as the regular column names.

Author: Bill Chambers <b...@databricks.com>
Author: Bill Chambers <wchamb...@ischool.berkeley.edu>

Closes #13041 from anabranch/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/603f4453
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/603f4453
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/603f4453

Branch: refs/heads/master
Commit: 603f4453a16825cc5773cfe24d6ae4cee5ec949a
Parents: f14c4ba
Author: Bill Chambers <b...@databricks.com>
Authored: Wed May 11 17:42:13 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed May 11 17:42:13 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                    |  2 +-
 .../execution/datasources/csv/DefaultSource.scala   |  6 ++++--
 .../src/test/resources/cars-blank-column-name.csv   |  3 +++
 .../sql/execution/datasources/csv/CSVSuite.scala    | 16 ++++++++++++++--
 4 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 7fd7583..5cb1860 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -358,7 +358,7 @@ class DataFrameReader(object):
 
         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
         >>> df.dtypes
-        [('C0', 'string'), ('C1', 'string')]
+        [('_c0', 'string'), ('_c1', 'string')]
         """
         if schema is not None:
             self.schema(schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 948fac0..f47ed76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -61,9 +61,11 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
     val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
 
     val header = if (csvOptions.headerFlag) {
-      firstRow
+      firstRow.zipWithIndex.map { case (value, index) =>
+        if (value == null || value.isEmpty || value == csvOptions.nullValue) 
s"_c$index" else value
+      }
     } else {
-      firstRow.zipWithIndex.map { case (value, index) => s"C$index" }
+      firstRow.zipWithIndex.map { case (value, index) => s"_c$index" }
     }
 
     val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)

http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/test/resources/cars-blank-column-name.csv
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/cars-blank-column-name.csv 
b/sql/core/src/test/resources/cars-blank-column-name.csv
new file mode 100644
index 0000000..0b804b1
--- /dev/null
+++ b/sql/core/src/test/resources/cars-blank-column-name.csv
@@ -0,0 +1,3 @@
+"",,make,customer,comment
+2012,"Tesla","S","bill","blank"
+2013,"Tesla","S","c","something"

http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b6cdc8c..ae91e0f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -38,6 +38,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
   private val carsAltFile = "cars-alternative.csv"
   private val carsUnbalancedQuotesFile = "cars-unbalanced-quotes.csv"
   private val carsNullFile = "cars-null.csv"
+  private val carsBlankColName = "cars-blank-column-name.csv"
   private val emptyFile = "empty.csv"
   private val commentsFile = "comments.csv"
   private val disableCommentsFile = "disable_comments.csv"
@@ -71,14 +72,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
       if (withHeader) {
         assert(df.schema.fieldNames === Array("year", "make", "model", 
"comment", "blank"))
       } else {
-        assert(df.schema.fieldNames === Array("C0", "C1", "C2", "C3", "C4"))
+        assert(df.schema.fieldNames === Array("_c0", "_c1", "_c2", "_c3", 
"_c4"))
       }
     }
 
     if (checkValues) {
       val yearValues = List("2012", "1997", "2015")
       val actualYears = if (!withHeader) "year" :: yearValues else yearValues
-      val years = if (withHeader) df.select("year").collect() else 
df.select("C0").collect()
+      val years = if (withHeader) df.select("year").collect() else 
df.select("_c0").collect()
 
       years.zipWithIndex.foreach { case (year, index) =>
         if (checkTypes) {
@@ -224,6 +225,17 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     assert(cars.select("year").collect().size === 2)
   }
 
+  test("test for blank column names on read and select columns") {
+    val cars = spark.read
+      .format("csv")
+      .options(Map("header" -> "true", "inferSchema" -> "true"))
+      .load(testFile(carsBlankColName))
+
+    assert(cars.select("customer").collect().size == 2)
+    assert(cars.select("_c0").collect().size == 2)
+    assert(cars.select("_c1").collect().size == 2)
+  }
+
   test("test for FAILFAST parsing mode") {
     val exception = intercept[SparkException]{
       spark.read


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to