spark git commit: [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query
Repository: spark Updated Branches: refs/heads/branch-2.0 adc1c2685 -> af37bdd3a [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query ## What changes were proposed in this pull request? Currently, `INSERT INTO` with `GROUP BY` query tries to make at least 200 files (default value of `spark.sql.shuffle.partition`), which results in lots of empty files. This PR makes it avoid creating empty files during overwriting into Hive table and in internal data sources with group by query. This checks whether the given partition has data in it or not and creates/writes file only when it actually has data. ## How was this patch tested? Unittests in `InsertIntoHiveTableSuite` and `HadoopFsRelationTest`. Closes #8411 Author: hyukjinkwonAuthor: Keuntae Park Closes #12855 from HyukjinKwon/pr/8411. (cherry picked from commit 8d05a7a98bdbd3ce7c81d273e05a375877ebe68f) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af37bdd3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af37bdd3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af37bdd3 Branch: refs/heads/branch-2.0 Commit: af37bdd3a7cee5206f98b3a2ba9113e71b53a2f4 Parents: adc1c26 Author: hyukjinkwon Authored: Tue May 17 11:18:51 2016 -0700 Committer: Michael Armbrust Committed: Tue May 17 11:21:06 2016 -0700 -- .../execution/datasources/WriterContainer.scala | 221 ++- .../spark/sql/hive/hiveWriterContainers.scala | 24 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 41 +++- .../sql/sources/HadoopFsRelationTest.scala | 22 +- 4 files changed, 182 insertions(+), 126 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/af37bdd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 3b064a5..7e12bbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -239,48 +239,50 @@ private[sql] class DefaultWriterContainer( extends BaseWriterContainer(relation, job, isAppend) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { -executorSideSetup(taskContext) -val configuration = taskAttemptContext.getConfiguration -configuration.set("spark.sql.sources.output.path", outputPath) -var writer = newOutputWriter(getWorkPath) -writer.initConverter(dataSchema) - -// If anything below fails, we should abort the task. -try { - Utils.tryWithSafeFinallyAndFailureCallbacks { -while (iterator.hasNext) { - val internalRow = iterator.next() - writer.writeInternal(internalRow) -} -commitTask() - }(catchBlock = abortTask()) -} catch { - case t: Throwable => -throw new SparkException("Task failed while writing rows", t) -} +if (iterator.hasNext) { + executorSideSetup(taskContext) + val configuration = taskAttemptContext.getConfiguration + configuration.set("spark.sql.sources.output.path", outputPath) + var writer = newOutputWriter(getWorkPath) + writer.initConverter(dataSchema) -def commitTask(): Unit = { + // If anything below fails, we should abort the task. try { -if (writer != null) { - writer.close() - writer = null -} -super.commitTask() +Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { +val internalRow = iterator.next() +writer.writeInternal(internalRow) + } + commitTask() +}(catchBlock = abortTask()) } catch { -case cause: Throwable => - // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and - // will cause `abortTask()` to be invoked. - throw new RuntimeException("Failed to commit task", cause) +case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } -} -def abortTask(): Unit = { - try { -if (writer != null) { - writer.close() + def commitTask(): Unit = { +try { + if (writer != null) { +writer.close() +writer = null + } + super.commitTask() +
spark git commit: [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query
Repository: spark Updated Branches: refs/heads/master 20a89478e -> 8d05a7a98 [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query ## What changes were proposed in this pull request? Currently, `INSERT INTO` with `GROUP BY` query tries to make at least 200 files (default value of `spark.sql.shuffle.partition`), which results in lots of empty files. This PR makes it avoid creating empty files during overwriting into Hive table and in internal data sources with group by query. This checks whether the given partition has data in it or not and creates/writes file only when it actually has data. ## How was this patch tested? Unittests in `InsertIntoHiveTableSuite` and `HadoopFsRelationTest`. Closes #8411 Author: hyukjinkwonAuthor: Keuntae Park Closes #12855 from HyukjinKwon/pr/8411. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d05a7a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d05a7a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d05a7a9 Branch: refs/heads/master Commit: 8d05a7a98bdbd3ce7c81d273e05a375877ebe68f Parents: 20a8947 Author: hyukjinkwon Authored: Tue May 17 11:18:51 2016 -0700 Committer: Michael Armbrust Committed: Tue May 17 11:18:51 2016 -0700 -- .../execution/datasources/WriterContainer.scala | 221 ++- .../spark/sql/hive/hiveWriterContainers.scala | 24 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 41 +++- .../sql/sources/HadoopFsRelationTest.scala | 22 +- 4 files changed, 182 insertions(+), 126 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8d05a7a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 3b064a5..7e12bbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -239,48 +239,50 @@ private[sql] class DefaultWriterContainer( extends BaseWriterContainer(relation, job, isAppend) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { -executorSideSetup(taskContext) -val configuration = taskAttemptContext.getConfiguration -configuration.set("spark.sql.sources.output.path", outputPath) -var writer = newOutputWriter(getWorkPath) -writer.initConverter(dataSchema) - -// If anything below fails, we should abort the task. -try { - Utils.tryWithSafeFinallyAndFailureCallbacks { -while (iterator.hasNext) { - val internalRow = iterator.next() - writer.writeInternal(internalRow) -} -commitTask() - }(catchBlock = abortTask()) -} catch { - case t: Throwable => -throw new SparkException("Task failed while writing rows", t) -} +if (iterator.hasNext) { + executorSideSetup(taskContext) + val configuration = taskAttemptContext.getConfiguration + configuration.set("spark.sql.sources.output.path", outputPath) + var writer = newOutputWriter(getWorkPath) + writer.initConverter(dataSchema) -def commitTask(): Unit = { + // If anything below fails, we should abort the task. try { -if (writer != null) { - writer.close() - writer = null -} -super.commitTask() +Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { +val internalRow = iterator.next() +writer.writeInternal(internalRow) + } + commitTask() +}(catchBlock = abortTask()) } catch { -case cause: Throwable => - // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and - // will cause `abortTask()` to be invoked. - throw new RuntimeException("Failed to commit task", cause) +case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } -} -def abortTask(): Unit = { - try { -if (writer != null) { - writer.close() + def commitTask(): Unit = { +try { + if (writer != null) { +writer.close() +writer = null + } + super.commitTask() +} catch { + case cause: Throwable => +// This exception will be handled in