[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107822893
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

Yes, I was thinking in that way. I remember I tried several tries at that 
time but failed to make a good fix, and could not have some time to work on 
that further.

Another problem is, it might be a datasource-specific issue because, for 
example, ORC does not write out empty df. For example,

```
scala> spark.range(100).filter("id > 100").write.orc("/tmp/abc1")

scala> spark.read.orc("/tmp/abc1").show()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It 
must be specified manually.;
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182)
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182)
```

This issue is described in 
https://issues.apache.org/jira/browse/SPARK-15474.

FWIW, I happened to see https://issues.apache.org/jira/browse/SPARK-15693 
around that time and I kind of felt we may be able to consolidate this issue 
with it although it is a rough idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107821138
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

Thanks for your prompt. How about just left one empty file containing the 
metadata when df has empty partition? Furthmore, we may just left one metadata 
file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107652020
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

See https://github.com/apache/spark/pull/12855, 
https://issues.apache.org/jira/browse/SPARK-10216 and 
https://issues.apache.org/jira/browse/SPARK-15393


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107650990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

Reading empty data should be fine too. I should preserve the schema. I am 
pretty sure that we want this case because mine was reverted due to the case 
above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107650070
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

@HyukjinKwon IIUC, this case should fail as expected, as there is no 
output. Am i missing something?

```
spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107637615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

Let me see how to cover this case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/17395#discussion_r107611325
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
   var fileCounter = 0
   var recordsInFile: Long = 0L
-  newOutputWriter(fileCounter)
+  // Skip the empty partition to avoid creating a mass of 'empty' 
files.
+  if (iter.hasNext) {
+newOutputWriter(fileCounter)
--- End diff --

I proposed the similar PR before but got reverted. In this case, Parquet 
would not write out the footer and schema information. Namely, this will break 
the case below:

```scala
spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()
```

Up to my knowledge, we don't have test cases for them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...

2017-03-23 Thread uncleGen
GitHub user uncleGen opened a pull request:

https://github.com/apache/spark/pull/17395

[SPARK-20065][SS] Avoid to output empty parquet files

## Problem Description

Reported by Silvio Fiorito

I've got a Kafka topic which I'm querying, running a windowed aggregation, 
with a 30 second watermark, 10 second trigger, writing out to Parquet with 
append output mode.

Every 10 second trigger generates a file, regardless of whether there was 
any data for that trigger, or whether any records were actually finalized by 
the watermark.

Is this expected behavior or should it not write out these empty files?

```
val df = spark.readStream.format("kafka")

val query = df
  .withWatermark("timestamp", "30 seconds")
  .groupBy(window($"timestamp", "10 seconds"))
  .count()
  .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")

query
  .writeStream
  .format("parquet")
  .option("checkpointLocation", aggChk)
  .trigger(ProcessingTime("10 seconds"))
  .outputMode("append")
  .start(aggPath)
```

As the query executes, do a file listing on "aggPath" and you'll see 339 
byte files at a minimum until we arrive at the first watermark and the initial 
batch is finalized. Even after that though, as there are empty batches it'll 
keep generating empty files every trigger.

## What changes were proposed in this pull request?

Check the partition is empty or not, and skip empty partition to avoid 
output empty file.

## How was this patch tested?

Jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uncleGen/spark SPARK-20065

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17395


commit 86a7d2fa96e3134c1e64864eba81a3bebdedceea
Author: uncleGen 
Date:   2017-03-23T08:10:31Z

avoid to output empty parquet files




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org