[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23130 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237121289 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- thanks for pointing it out, I think we are good here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237085148 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- I think so, `wholetext` makes files not splittable: https://github.com/apache/spark/blob/46110a589f4e91cd7605c5a2c34c3db6b2635830/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L57 This can guarantee ( in text datasources at least) one file -> one partition. > IIUC one partition can read multiple files. Do you mean this code? https://github.com/apache/spark/blob/8c6871828e3eb9fdb3bc665441a1aaf60b86b1e7/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L459-L464 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237062653 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- do you mean `wholetext` mode will force to create one partition per file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237050065 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- > does this test fail without your change? Yes, it does due to the `wholetext`. > Is JSON the only data source that may return a row for empty file? We depend on underlying parser here. I will check CSV and Text. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237045706 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- does this test fail without your change? IIUC one partition can read multiple files. Is JSON the only data source that may return a row for empty file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236721863 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in load") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions == 1) --- End diff -- I am just referring to === and the order of args. I'm sure the test was right as-is in what it asserts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236719477 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in load") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions == 1) --- End diff -- It seems expected value should be on right. I changed the order and got the following: ```scala assert(123 === readback.rdd.getNumPartitions) ``` ``` 123 did not equal 1 ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155) Expected :1 Actual :123 ``` Current assert triggers correct message: ```scala assert(readback.rdd.getNumPartitions == 123) ``` ``` 1 did not equal 123 ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155) Expected :123 Actual :1 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236705916 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in load") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions == 1) --- End diff -- Do we need `1 === ...` to get the right assert message? it's tiny. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236705666 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in load") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes) --- End diff -- Nit for consistency: `.getBytes(StandardCharsets.UTF_8)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236515927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- yes, and the same change is also in `createNonBucketedReadRDD` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236361995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- This `createBucketedReadRDD ` is for the bucket table, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236233074 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- It's non-critical path in terms of performance. Should be okay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236226380 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- I personally prefer filter + map as it's shorter and clearer. I don't know if one is faster; two transformations vs having to return Some/None. For a Dataset operation I'd favor one operation, but this is just local Scala code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236149203 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- do you mean changing `filter...map...` to `flatMap`? I don't have a strong preference about it. The updated test cases and the new test case are for this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236135787 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- Do we have a test case for this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236135647 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- do the filtering inside the map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236077780 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in load") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array[Byte]()) --- End diff -- @MaxGekk, not a big deal but let's use `Array.empty[Byte]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/23130 [SPARK-26161][SQL] Ignore empty files in load ## What changes were proposed in this pull request? In the PR, I propose filtering out all empty files inside of `DataSourceScanExec` and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files. ## How was this patch tested? Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the `wholetext` mode must create only one partition for non-empty file. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 ignore-empty-files Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23130.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23130 commit 36c64f0a8c47763dd99f0ed07cdf5a1813b0f2b7 Author: Maxim Gekk Date: 2018-11-24T17:29:25Z Test checks empty files are not loaded commit e428b83c82623f33e5b5e5073251d73d18a3c903 Author: Maxim Gekk Date: 2018-11-24T17:29:41Z Fix json tests commit 4212add0bcbcaaec5ee8e5483ea16d3e7929dcb6 Author: Maxim Gekk Date: 2018-11-24T17:34:24Z Fix test commit 2458882037349987396e8456799c451e80566442 Author: Maxim Gekk Date: 2018-11-24T17:36:06Z Filtering out empty files commit b200a50de58641d297381bec45687317bd21dfb7 Author: Maxim Gekk Date: 2018-11-24T17:53:45Z Fix imports --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org