[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23130


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r237121289
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in non bucketed read") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), 
"a".getBytes(StandardCharsets.UTF_8))
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions === 1)
--- End diff --

thanks for pointing it out, I think we are good here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-28 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r237085148
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in non bucketed read") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), 
"a".getBytes(StandardCharsets.UTF_8))
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions === 1)
--- End diff --

I think so, `wholetext` makes files not splittable:

https://github.com/apache/spark/blob/46110a589f4e91cd7605c5a2c34c3db6b2635830/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L57

This can guarantee ( in text datasources at least) one file -> one 
partition. 

> IIUC one partition can read multiple files. 

Do you mean this code? 
https://github.com/apache/spark/blob/8c6871828e3eb9fdb3bc665441a1aaf60b86b1e7/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L459-L464


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r237062653
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in non bucketed read") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), 
"a".getBytes(StandardCharsets.UTF_8))
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions === 1)
--- End diff --

do you mean `wholetext` mode will force to create one partition per file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-28 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r237050065
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in non bucketed read") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), 
"a".getBytes(StandardCharsets.UTF_8))
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions === 1)
--- End diff --

> does this test fail without your change?

Yes, it does due to the `wholetext`. 

>  Is JSON the only data source that may return a row for empty file?

We depend on underlying parser here. I will check CSV and Text.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r237045706
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in non bucketed read") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), 
"a".getBytes(StandardCharsets.UTF_8))
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions === 1)
--- End diff --

does this test fail without your change? IIUC one partition can read 
multiple files. Is JSON the only data source that may return a row for empty 
file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236721863
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in load") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions == 1)
--- End diff --

I am just referring to === and the order of args. I'm sure the test was 
right as-is in what it asserts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-27 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236719477
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in load") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions == 1)
--- End diff --

It seems expected value should be on right. I changed the order and got the 
following:
```scala
assert(123 === readback.rdd.getNumPartitions)
```
```
123 did not equal 1
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at 
(SaveLoadSuite.scala:155)
Expected :1
Actual   :123
```

Current assert triggers correct message:
```scala
assert(readback.rdd.getNumPartitions == 123)
```
```
1 did not equal 123
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at 
(SaveLoadSuite.scala:155)
Expected :123
Actual   :1
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236705916
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in load") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
+  val readback = spark.read.option("wholetext", true).text(path)
+
+  assert(readback.rdd.getNumPartitions == 1)
--- End diff --

Do we need `1 === ...` to get the right assert message? it's tiny.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236705666
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in load") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+  Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
--- End diff --

Nit for consistency: `.getBytes(StandardCharsets.UTF_8)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236515927
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

yes, and the same change is also in `createNonBucketedReadRDD`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236361995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

This `createBucketedReadRDD ` is for the bucket table, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236233074
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

It's non-critical path in terms of performance. Should be okay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236226380
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

I personally prefer filter + map as it's shorter and clearer. I don't know 
if one is faster; two transformations vs having to return Some/None. For a 
Dataset operation I'd favor one operation, but this is just local Scala code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236149203
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

do you mean changing `filter...map...` to `flatMap`? I don't have a strong 
preference about it.

The updated test cases and the new test case are for this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236135787
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

Do we have a test case for this line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236135647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -388,7 +388,7 @@ case class FileSourceScanExec(
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
   selectedPartitions.flatMap { p =>
-p.files.map { f =>
+p.files.filter(_.getLen > 0).map { f =>
--- End diff --

do the filtering inside the map?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23130#discussion_r236077780
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---
@@ -142,4 +143,15 @@ class SaveLoadSuite extends DataSourceTest with 
SharedSQLContext with BeforeAndA
   assert(e.contains(s"Partition column `$unknown` not found in schema 
$schemaCatalog"))
 }
   }
+
+  test("skip empty files in load") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath
+  Files.write(Paths.get(path, "empty"), Array[Byte]())
--- End diff --

@MaxGekk, not a big deal but let's use `Array.empty[Byte]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load

2018-11-24 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/23130

[SPARK-26161][SQL] Ignore empty files in load

## What changes were proposed in this pull request?

In the PR, I propose filtering out all empty files inside of 
`DataSourceScanExec` and exclude them from file splits. It should reduce 
overhead of opening and reading files without any data, and as consequence 
datasources will not produce empty partitions for such files.

## How was this patch tested?

Added a test which creates an empty and non-empty files. If empty files are 
ignored in load, Text datasource in the `wholetext` mode must create only one 
partition for non-empty file.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 ignore-empty-files

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23130


commit 36c64f0a8c47763dd99f0ed07cdf5a1813b0f2b7
Author: Maxim Gekk 
Date:   2018-11-24T17:29:25Z

Test checks empty files are not loaded

commit e428b83c82623f33e5b5e5073251d73d18a3c903
Author: Maxim Gekk 
Date:   2018-11-24T17:29:41Z

Fix json tests

commit 4212add0bcbcaaec5ee8e5483ea16d3e7929dcb6
Author: Maxim Gekk 
Date:   2018-11-24T17:34:24Z

Fix test

commit 2458882037349987396e8456799c451e80566442
Author: Maxim Gekk 
Date:   2018-11-24T17:36:06Z

Filtering out empty files

commit b200a50de58641d297381bec45687317bd21dfb7
Author: Maxim Gekk 
Date:   2018-11-24T17:53:45Z

Fix imports




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org