[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21415 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190725111 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala --- @@ -74,7 +74,49 @@ object CSVBenchmarks { } } + def multiColumnsBenchmark(rowsNum: Int): Unit = { +val colsNum = 1000 +val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum) + +withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + val values = (0 until colsNum).map(i => i.toString).mkString(",") + val columnNames = schema.fieldNames + + spark.range(rowsNum) +.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) +.write.option("header", true) +.csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns", 3) { _ => +ds.select("*").filter((row: Row) => true).count() + } + val cols100 = columnNames.take(100).map(Column(_)) + benchmark.addCase(s"Select 100 columns", 3) { _ => +ds.select(cols100: _*).filter((row: Row) => true).count() + } + benchmark.addCase(s"Select one column", 3) { _ => +ds.select($"col1").filter((row: Row) => true).count() + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X + Select 100 columns 28625 / 32884 0.0 28625.1 2.7X + Select one column 22498 / 22669 0.0 22497.8 3.4X --- End diff -- sure, added `count()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190724995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala --- @@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._ class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String) +defaultColumnNameOfCorruptRecord: String, +val columnPruning: Boolean) extends Logging with Serializable { def this( parameters: Map[String, String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String = "") = { +defaultColumnNameOfCorruptRecord: String = "", +columnPruning: Boolean = false) = { --- End diff -- removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190724870 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala --- @@ -1383,4 +1385,31 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(ds, Seq(Row(""" "a" """))) } + + test("SPARK-24244: Select a subset of all columns") { +withTempPath { path => + import collection.JavaConverters._ + val schema = new StructType() +.add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType) +.add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType) +.add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType) +.add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType) +.add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType) + + val odf = spark.createDataFrame(List( +Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15), +Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15) + ).asJava, schema) + odf.write.csv(path.getCanonicalPath) + val idf = spark.read +.schema(schema) +.csv(path.getCanonicalPath) +.select('f15, 'f10, 'f5) --- End diff -- added an assert for count(). In the `CSVSuite`, there are a few tests with `count()` over malformed csv files. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190701039 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala --- @@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._ class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String) +defaultColumnNameOfCorruptRecord: String, +val columnPruning: Boolean) extends Logging with Serializable { def this( parameters: Map[String, String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String = "") = { +defaultColumnNameOfCorruptRecord: String = "", +columnPruning: Boolean = false) = { --- End diff -- always enabling it is also not right. Can we remove the default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190694499 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala --- @@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._ class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String) +defaultColumnNameOfCorruptRecord: String, +val columnPruning: Boolean) extends Logging with Serializable { def this( parameters: Map[String, String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String = "") = { +defaultColumnNameOfCorruptRecord: String = "", +columnPruning: Boolean = false) = { --- End diff -- The constructor with disabled `columnPruning` is called in the CSV writer and 30 times from test suites like `UnivocityParserSuite` and `CSVInferSchemaSuite`. > We might lose the pruning opportunity if we call this constructor. ok. I will enable it by default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190661257 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala --- @@ -74,7 +74,49 @@ object CSVBenchmarks { } } + def multiColumnsBenchmark(rowsNum: Int): Unit = { +val colsNum = 1000 +val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum) + +withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + val values = (0 until colsNum).map(i => i.toString).mkString(",") + val columnNames = schema.fieldNames + + spark.range(rowsNum) +.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) +.write.option("header", true) +.csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns", 3) { _ => +ds.select("*").filter((row: Row) => true).count() + } + val cols100 = columnNames.take(100).map(Column(_)) + benchmark.addCase(s"Select 100 columns", 3) { _ => +ds.select(cols100: _*).filter((row: Row) => true).count() + } + benchmark.addCase(s"Select one column", 3) { _ => +ds.select($"col1").filter((row: Row) => true).count() + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + Wide rows with 1000 columns: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X + Select 100 columns 28625 / 32884 0.0 28625.1 2.7X + Select one column 22498 / 22669 0.0 22497.8 3.4X --- End diff -- count(1) too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190660661 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala --- @@ -1383,4 +1385,31 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(ds, Seq(Row(""" "a" """))) } + + test("SPARK-24244: Select a subset of all columns") { +withTempPath { path => + import collection.JavaConverters._ + val schema = new StructType() +.add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType) +.add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType) +.add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType) +.add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType) +.add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType) + + val odf = spark.createDataFrame(List( +Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15), +Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15) + ).asJava, schema) + odf.write.csv(path.getCanonicalPath) + val idf = spark.read +.schema(schema) +.csv(path.getCanonicalPath) +.select('f15, 'f10, 'f5) --- End diff -- Could you add an extreme test case? Try `count(1)` on csv files? That means zero column is required. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21415#discussion_r190660327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala --- @@ -29,17 +29,20 @@ import org.apache.spark.sql.catalyst.util._ class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String) +defaultColumnNameOfCorruptRecord: String, +val columnPruning: Boolean) extends Logging with Serializable { def this( parameters: Map[String, String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String = "") = { +defaultColumnNameOfCorruptRecord: String = "", +columnPruning: Boolean = false) = { --- End diff -- Let us do not set the default value for `columnPruning`? We might lose the pruning opportunity if we call this constructor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/21415 [SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser ## What changes were proposed in this pull request? uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like: ``` // Here we select only the columns by their indexes. // The parser just skips the values in other columns parserSettings.selectIndexes(4, 0, 1); CsvParser parser = new CsvParser(parserSettings); ``` In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns: ``` Select 100 columns out of 1000: x1.76 Select 1 column out of 1000: x2 ``` **Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. ## How was this patch tested? It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 csv-column-pruning2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21415 commit 9cffa0fccc33552e8fce3580a9a665b022f5bf22 Author: Maxim GekkDate: 2018-03-21T20:03:11Z Adding tests for select only requested columns commit fdbcbe3536aee04e6a84b72ac319726614416bc3 Author: Maxim Gekk Date: 2018-03-21T20:42:08Z Select indexes of required columns only commit 578f47b0f32a76caf6c9ede8763c9cf85a1c83e9 Author: Maxim Gekk Date: 2018-03-24T10:41:29Z Fix the case when number of parsed fields are not matched to required schema commit 0f942c308dca173dad8f421e893066b8c03d35a3 Author: Maxim Gekk Date: 2018-03-24T11:07:55Z Using selectIndexes if required number of columns are less than its total number. commit c4b11601e9c264729e141fff3dc653d868a7ad69 Author: Maxim Gekk Date: 2018-03-24T11:48:43Z Fix the test: force to read all columns commit 8cf6eab952d79628cb8ee2ff7b92dadae60ec686 Author: Maxim Gekk Date: 2018-04-06T20:55:35Z Fix merging conflicts commit 5b2f0b9d7346f927842bc1a2089a7299876f1894 Author: Maxim Gekk Date: 2018-04-29T11:52:08Z Benchmarks for many columns commit 6d1e902c0011e88dbafb65c4ad6e7431370ed12d Author: Maxim Gekk Date: 2018-04-29T12:59:58Z Make size of requiredSchema equals to amount of selected columns commit 4525795f7337cbd081f569cd79d7f90cb58edbee Author: Maxim Gekk Date: 2018-04-29T13:36:54Z Removing selection of all columns commit 8809cecf93d8e7a97eca827d9e8637a7eb5b2449 Author: Maxim Gekk Date: 2018-04-29T13:50:44Z Updating benchmarks for select indexes commit dc97ceb96185ed2eaa05fbe1aee8ecfe8ccb7e7d Author: Maxim Gekk Date: 2018-05-05T19:19:17Z Addressing Herman's review comments commit 51b31483263e13cd85b19b3efea65188945eda99 Author: Maxim Gekk Date: 2018-05-10T18:39:38Z Updated benchmark result for recent changes commit e3958b1468b490b548574b53512f0d83850e6f6f Author: Maxim Gekk Date: 2018-05-10T18:46:17Z Add ticket number to test title commit a4a0a549156a15011c33c7877a35f244d75b7a4f Author: Maxim Gekk Date: 2018-05-10T19:02:24Z Removing unnecessary benchmark commit fa860157c982846524bd8f151daf8a2154117b34 Author: Maxim Gekk Date: 2018-05-13T18:49:49Z Updating the migration guide commit 15528d20a74904c14c58bf3ad54c9a552c519430 Author: Maxim Gekk Date: 2018-05-13T18:55:06Z Moving some values back as it was. commit f90daa7ea33d119be978c27de10978c2d6281e25 Author: Maxim Gekk Date: 2018-05-13T18:58:20Z Renaming the test title commit 4d9873d39277b9cbaee892957c06bfc2cb9a52f1 Author: Maxim Gekk Date: 2018-05-17T20:02:47Z Improving of the migration guide commit 7dcfc7a7664fcd5311cb352f0ea7a24b3cc1c639 Author: Maxim Gekk Date: 2018-05-17T20:12:49Z Merge remote-tracking branch 'origin/master' into csv-column-pruning # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala #