spark git commit: [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query

2016-05-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 adc1c2685 -> af37bdd3a


[SPARK-10216][SQL] Avoid creating empty files during overwriting with group by 
query

## What changes were proposed in this pull request?

Currently, `INSERT INTO` with `GROUP BY` query tries to make at least 200 files 
(default value of `spark.sql.shuffle.partition`), which results in lots of 
empty files.

This PR makes it avoid creating empty files during overwriting into Hive table 
and in internal data sources  with group by query.

This checks whether the given partition has data in it or not and 
creates/writes file only when it actually has data.

## How was this patch tested?

Unittests in `InsertIntoHiveTableSuite` and `HadoopFsRelationTest`.

Closes #8411

Author: hyukjinkwon 
Author: Keuntae Park 

Closes #12855 from HyukjinKwon/pr/8411.

(cherry picked from commit 8d05a7a98bdbd3ce7c81d273e05a375877ebe68f)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af37bdd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af37bdd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af37bdd3

Branch: refs/heads/branch-2.0
Commit: af37bdd3a7cee5206f98b3a2ba9113e71b53a2f4
Parents: adc1c26
Author: hyukjinkwon 
Authored: Tue May 17 11:18:51 2016 -0700
Committer: Michael Armbrust 
Committed: Tue May 17 11:21:06 2016 -0700

--
 .../execution/datasources/WriterContainer.scala | 221 ++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  24 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala |  41 +++-
 .../sql/sources/HadoopFsRelationTest.scala  |  22 +-
 4 files changed, 182 insertions(+), 126 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/af37bdd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 3b064a5..7e12bbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -239,48 +239,50 @@ private[sql] class DefaultWriterContainer(
   extends BaseWriterContainer(relation, job, isAppend) {
 
   def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-executorSideSetup(taskContext)
-val configuration = taskAttemptContext.getConfiguration
-configuration.set("spark.sql.sources.output.path", outputPath)
-var writer = newOutputWriter(getWorkPath)
-writer.initConverter(dataSchema)
-
-// If anything below fails, we should abort the task.
-try {
-  Utils.tryWithSafeFinallyAndFailureCallbacks {
-while (iterator.hasNext) {
-  val internalRow = iterator.next()
-  writer.writeInternal(internalRow)
-}
-commitTask()
-  }(catchBlock = abortTask())
-} catch {
-  case t: Throwable =>
-throw new SparkException("Task failed while writing rows", t)
-}
+if (iterator.hasNext) {
+  executorSideSetup(taskContext)
+  val configuration = taskAttemptContext.getConfiguration
+  configuration.set("spark.sql.sources.output.path", outputPath)
+  var writer = newOutputWriter(getWorkPath)
+  writer.initConverter(dataSchema)
 
-def commitTask(): Unit = {
+  // If anything below fails, we should abort the task.
   try {
-if (writer != null) {
-  writer.close()
-  writer = null
-}
-super.commitTask()
+Utils.tryWithSafeFinallyAndFailureCallbacks {
+  while (iterator.hasNext) {
+val internalRow = iterator.next()
+writer.writeInternal(internalRow)
+  }
+  commitTask()
+}(catchBlock = abortTask())
   } catch {
-case cause: Throwable =>
-  // This exception will be handled in 
`InsertIntoHadoopFsRelation.insert$writeRows`, and
-  // will cause `abortTask()` to be invoked.
-  throw new RuntimeException("Failed to commit task", cause)
+case t: Throwable =>
+  throw new SparkException("Task failed while writing rows", t)
   }
-}
 
-def abortTask(): Unit = {
-  try {
-if (writer != null) {
-  writer.close()
+  def commitTask(): Unit = {
+try {
+  if (writer != null) {
+writer.close()
+writer = null
+  }
+  super.commitTask()
+ 

spark git commit: [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query

2016-05-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 20a89478e -> 8d05a7a98


[SPARK-10216][SQL] Avoid creating empty files during overwriting with group by 
query

## What changes were proposed in this pull request?

Currently, `INSERT INTO` with `GROUP BY` query tries to make at least 200 files 
(default value of `spark.sql.shuffle.partition`), which results in lots of 
empty files.

This PR makes it avoid creating empty files during overwriting into Hive table 
and in internal data sources  with group by query.

This checks whether the given partition has data in it or not and 
creates/writes file only when it actually has data.

## How was this patch tested?

Unittests in `InsertIntoHiveTableSuite` and `HadoopFsRelationTest`.

Closes #8411

Author: hyukjinkwon 
Author: Keuntae Park 

Closes #12855 from HyukjinKwon/pr/8411.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d05a7a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d05a7a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d05a7a9

Branch: refs/heads/master
Commit: 8d05a7a98bdbd3ce7c81d273e05a375877ebe68f
Parents: 20a8947
Author: hyukjinkwon 
Authored: Tue May 17 11:18:51 2016 -0700
Committer: Michael Armbrust 
Committed: Tue May 17 11:18:51 2016 -0700

--
 .../execution/datasources/WriterContainer.scala | 221 ++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  24 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala |  41 +++-
 .../sql/sources/HadoopFsRelationTest.scala  |  22 +-
 4 files changed, 182 insertions(+), 126 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8d05a7a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 3b064a5..7e12bbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -239,48 +239,50 @@ private[sql] class DefaultWriterContainer(
   extends BaseWriterContainer(relation, job, isAppend) {
 
   def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-executorSideSetup(taskContext)
-val configuration = taskAttemptContext.getConfiguration
-configuration.set("spark.sql.sources.output.path", outputPath)
-var writer = newOutputWriter(getWorkPath)
-writer.initConverter(dataSchema)
-
-// If anything below fails, we should abort the task.
-try {
-  Utils.tryWithSafeFinallyAndFailureCallbacks {
-while (iterator.hasNext) {
-  val internalRow = iterator.next()
-  writer.writeInternal(internalRow)
-}
-commitTask()
-  }(catchBlock = abortTask())
-} catch {
-  case t: Throwable =>
-throw new SparkException("Task failed while writing rows", t)
-}
+if (iterator.hasNext) {
+  executorSideSetup(taskContext)
+  val configuration = taskAttemptContext.getConfiguration
+  configuration.set("spark.sql.sources.output.path", outputPath)
+  var writer = newOutputWriter(getWorkPath)
+  writer.initConverter(dataSchema)
 
-def commitTask(): Unit = {
+  // If anything below fails, we should abort the task.
   try {
-if (writer != null) {
-  writer.close()
-  writer = null
-}
-super.commitTask()
+Utils.tryWithSafeFinallyAndFailureCallbacks {
+  while (iterator.hasNext) {
+val internalRow = iterator.next()
+writer.writeInternal(internalRow)
+  }
+  commitTask()
+}(catchBlock = abortTask())
   } catch {
-case cause: Throwable =>
-  // This exception will be handled in 
`InsertIntoHadoopFsRelation.insert$writeRows`, and
-  // will cause `abortTask()` to be invoked.
-  throw new RuntimeException("Failed to commit task", cause)
+case t: Throwable =>
+  throw new SparkException("Task failed while writing rows", t)
   }
-}
 
-def abortTask(): Unit = {
-  try {
-if (writer != null) {
-  writer.close()
+  def commitTask(): Unit = {
+try {
+  if (writer != null) {
+writer.close()
+writer = null
+  }
+  super.commitTask()
+} catch {
+  case cause: Throwable =>
+// This exception will be handled in