[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21415


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190725111
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
 ---
@@ -74,7 +74,49 @@ object CSVBenchmarks {
 }
   }
 
+  def multiColumnsBenchmark(rowsNum: Int): Unit = {
+val colsNum = 1000
+val benchmark = new Benchmark(s"Wide rows with $colsNum columns", 
rowsNum)
+
+withTempPath { path =>
+  val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", 
IntegerType))
+  val schema = StructType(fields)
+  val values = (0 until colsNum).map(i => i.toString).mkString(",")
+  val columnNames = schema.fieldNames
+
+  spark.range(rowsNum)
+.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+.write.option("header", true)
+.csv(path.getAbsolutePath)
+
+  val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
+
+  benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
+ds.select("*").filter((row: Row) => true).count()
+  }
+  val cols100 = columnNames.take(100).map(Column(_))
+  benchmark.addCase(s"Select 100 columns", 3) { _ =>
+ds.select(cols100: _*).filter((row: Row) => true).count()
+  }
+  benchmark.addCase(s"Select one column", 3) { _ =>
+ds.select($"col1").filter((row: Row) => true).count()
+  }
+
+  /*
+  Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+  Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s)  
 Per Row(ns)   Relative
+  

+  Select 1000 columns 76910 / 78065  0.0   
76909.8   1.0X
+  Select 100 columns  28625 / 32884  0.0   
28625.1   2.7X
+  Select one column   22498 / 22669  0.0   
22497.8   3.4X
--- End diff --

sure, added `count()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190724995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 ---
@@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._
 class CSVOptions(
 @transient val parameters: CaseInsensitiveMap[String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String)
+defaultColumnNameOfCorruptRecord: String,
+val columnPruning: Boolean)
   extends Logging with Serializable {
 
   def this(
 parameters: Map[String, String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String = "") = {
+defaultColumnNameOfCorruptRecord: String = "",
+columnPruning: Boolean = false) = {
--- End diff --

removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190724870
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ---
@@ -1383,4 +1385,31 @@ class CSVSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils with Te
 
 checkAnswer(ds, Seq(Row(""" "a" """)))
   }
+
+  test("SPARK-24244: Select a subset of all columns") {
+withTempPath { path =>
+  import collection.JavaConverters._
+  val schema = new StructType()
+.add("f1", IntegerType).add("f2", IntegerType).add("f3", 
IntegerType)
+.add("f4", IntegerType).add("f5", IntegerType).add("f6", 
IntegerType)
+.add("f7", IntegerType).add("f8", IntegerType).add("f9", 
IntegerType)
+.add("f10", IntegerType).add("f11", IntegerType).add("f12", 
IntegerType)
+.add("f13", IntegerType).add("f14", IntegerType).add("f15", 
IntegerType)
+
+  val odf = spark.createDataFrame(List(
+Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
+Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, 
-15)
+  ).asJava, schema)
+  odf.write.csv(path.getCanonicalPath)
+  val idf = spark.read
+.schema(schema)
+.csv(path.getCanonicalPath)
+.select('f15, 'f10, 'f5)
--- End diff --

added an assert for count(). In the `CSVSuite`, there are a few tests with 
`count()` over malformed csv files.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190701039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 ---
@@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._
 class CSVOptions(
 @transient val parameters: CaseInsensitiveMap[String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String)
+defaultColumnNameOfCorruptRecord: String,
+val columnPruning: Boolean)
   extends Logging with Serializable {
 
   def this(
 parameters: Map[String, String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String = "") = {
+defaultColumnNameOfCorruptRecord: String = "",
+columnPruning: Boolean = false) = {
--- End diff --

always enabling it is also not right. Can we remove the default?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190694499
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 ---
@@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._
 class CSVOptions(
 @transient val parameters: CaseInsensitiveMap[String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String)
+defaultColumnNameOfCorruptRecord: String,
+val columnPruning: Boolean)
   extends Logging with Serializable {
 
   def this(
 parameters: Map[String, String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String = "") = {
+defaultColumnNameOfCorruptRecord: String = "",
+columnPruning: Boolean = false) = {
--- End diff --

The constructor with disabled `columnPruning` is called in the CSV writer 
and 30 times from test suites like `UnivocityParserSuite` and 
`CSVInferSchemaSuite`.

> We might lose the pruning opportunity if we call this constructor.

ok. I will enable it by default.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190661257
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
 ---
@@ -74,7 +74,49 @@ object CSVBenchmarks {
 }
   }
 
+  def multiColumnsBenchmark(rowsNum: Int): Unit = {
+val colsNum = 1000
+val benchmark = new Benchmark(s"Wide rows with $colsNum columns", 
rowsNum)
+
+withTempPath { path =>
+  val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", 
IntegerType))
+  val schema = StructType(fields)
+  val values = (0 until colsNum).map(i => i.toString).mkString(",")
+  val columnNames = schema.fieldNames
+
+  spark.range(rowsNum)
+.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+.write.option("header", true)
+.csv(path.getAbsolutePath)
+
+  val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
+
+  benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
+ds.select("*").filter((row: Row) => true).count()
+  }
+  val cols100 = columnNames.take(100).map(Column(_))
+  benchmark.addCase(s"Select 100 columns", 3) { _ =>
+ds.select(cols100: _*).filter((row: Row) => true).count()
+  }
+  benchmark.addCase(s"Select one column", 3) { _ =>
+ds.select($"col1").filter((row: Row) => true).count()
+  }
+
+  /*
+  Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+  Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s)  
 Per Row(ns)   Relative
+  

+  Select 1000 columns 76910 / 78065  0.0   
76909.8   1.0X
+  Select 100 columns  28625 / 32884  0.0   
28625.1   2.7X
+  Select one column   22498 / 22669  0.0   
22497.8   3.4X
--- End diff --

count(1) too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190660661
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ---
@@ -1383,4 +1385,31 @@ class CSVSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils with Te
 
 checkAnswer(ds, Seq(Row(""" "a" """)))
   }
+
+  test("SPARK-24244: Select a subset of all columns") {
+withTempPath { path =>
+  import collection.JavaConverters._
+  val schema = new StructType()
+.add("f1", IntegerType).add("f2", IntegerType).add("f3", 
IntegerType)
+.add("f4", IntegerType).add("f5", IntegerType).add("f6", 
IntegerType)
+.add("f7", IntegerType).add("f8", IntegerType).add("f9", 
IntegerType)
+.add("f10", IntegerType).add("f11", IntegerType).add("f12", 
IntegerType)
+.add("f13", IntegerType).add("f14", IntegerType).add("f15", 
IntegerType)
+
+  val odf = spark.createDataFrame(List(
+Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
+Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, 
-15)
+  ).asJava, schema)
+  odf.write.csv(path.getCanonicalPath)
+  val idf = spark.read
+.schema(schema)
+.csv(path.getCanonicalPath)
+.select('f15, 'f10, 'f5)
--- End diff --

Could you add an extreme test case? Try `count(1)` on csv files? That means 
zero column is required.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21415#discussion_r190660327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 ---
@@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._
 class CSVOptions(
 @transient val parameters: CaseInsensitiveMap[String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String)
+defaultColumnNameOfCorruptRecord: String,
+val columnPruning: Boolean)
   extends Logging with Serializable {
 
   def this(
 parameters: Map[String, String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String = "") = {
+defaultColumnNameOfCorruptRecord: String = "",
+columnPruning: Boolean = false) = {
--- End diff --

Let us do not set the default value for `columnPruning`? We might lose the 
pruning opportunity if we call this constructor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-23 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/21415

[SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV 
parser

## What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes 
for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:

```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them 
into the CSV parser. Benchmarks show the following improvements in parsing of 
1000 columns:

```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```

**Note**: Comparing to current implementation, the changes can return 
different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes 
if only subset of all columns is requested. To have previous behavior, set 
`spark.sql.csv.parser.columnPruning.enabled` to `false`.
 
## How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing 
tests and by new benchmarks.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 csv-column-pruning2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21415


commit 9cffa0fccc33552e8fce3580a9a665b022f5bf22
Author: Maxim Gekk 
Date:   2018-03-21T20:03:11Z

Adding tests for select only requested columns

commit fdbcbe3536aee04e6a84b72ac319726614416bc3
Author: Maxim Gekk 
Date:   2018-03-21T20:42:08Z

Select indexes of required columns only

commit 578f47b0f32a76caf6c9ede8763c9cf85a1c83e9
Author: Maxim Gekk 
Date:   2018-03-24T10:41:29Z

Fix the case when number of parsed fields are not matched to required schema

commit 0f942c308dca173dad8f421e893066b8c03d35a3
Author: Maxim Gekk 
Date:   2018-03-24T11:07:55Z

Using selectIndexes if required number of columns are less than its total 
number.

commit c4b11601e9c264729e141fff3dc653d868a7ad69
Author: Maxim Gekk 
Date:   2018-03-24T11:48:43Z

Fix the test: force to read all columns

commit 8cf6eab952d79628cb8ee2ff7b92dadae60ec686
Author: Maxim Gekk 
Date:   2018-04-06T20:55:35Z

Fix merging conflicts

commit 5b2f0b9d7346f927842bc1a2089a7299876f1894
Author: Maxim Gekk 
Date:   2018-04-29T11:52:08Z

Benchmarks for many columns

commit 6d1e902c0011e88dbafb65c4ad6e7431370ed12d
Author: Maxim Gekk 
Date:   2018-04-29T12:59:58Z

Make size of requiredSchema equals to amount of selected columns

commit 4525795f7337cbd081f569cd79d7f90cb58edbee
Author: Maxim Gekk 
Date:   2018-04-29T13:36:54Z

Removing selection of all columns

commit 8809cecf93d8e7a97eca827d9e8637a7eb5b2449
Author: Maxim Gekk 
Date:   2018-04-29T13:50:44Z

Updating benchmarks for select indexes

commit dc97ceb96185ed2eaa05fbe1aee8ecfe8ccb7e7d
Author: Maxim Gekk 
Date:   2018-05-05T19:19:17Z

Addressing Herman's review comments

commit 51b31483263e13cd85b19b3efea65188945eda99
Author: Maxim Gekk 
Date:   2018-05-10T18:39:38Z

Updated benchmark result for recent changes

commit e3958b1468b490b548574b53512f0d83850e6f6f
Author: Maxim Gekk 
Date:   2018-05-10T18:46:17Z

Add ticket number to test title

commit a4a0a549156a15011c33c7877a35f244d75b7a4f
Author: Maxim Gekk 
Date:   2018-05-10T19:02:24Z

Removing unnecessary benchmark

commit fa860157c982846524bd8f151daf8a2154117b34
Author: Maxim Gekk 
Date:   2018-05-13T18:49:49Z

Updating the migration guide

commit 15528d20a74904c14c58bf3ad54c9a552c519430
Author: Maxim Gekk 
Date:   2018-05-13T18:55:06Z

Moving some values back as it was.

commit f90daa7ea33d119be978c27de10978c2d6281e25
Author: Maxim Gekk 
Date:   2018-05-13T18:58:20Z

Renaming the test title

commit 4d9873d39277b9cbaee892957c06bfc2cb9a52f1
Author: Maxim Gekk 
Date:   2018-05-17T20:02:47Z

Improving of the migration guide

commit 7dcfc7a7664fcd5311cb352f0ea7a24b3cc1c639
Author: Maxim Gekk 
Date:   2018-05-17T20:12:49Z

Merge remote-tracking branch 'origin/master' into csv-column-pruning

# Conflicts:
#   
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
#