Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21381#discussion_r189868899
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
---
@@ -265,27 +226,24 @@ object FileFormatWriter extends Logging {
val writeTask =
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for
file format like parquet.
- new EmptyDirectoryWriteTask(description)
+ new EmptyDirectoryDataWriter(description, taskAttemptContext,
committer)
} else if (description.partitionColumns.isEmpty &&
description.bucketIdExpression.isEmpty) {
- new SingleDirectoryWriteTask(description, taskAttemptContext,
committer)
+ new SingleDirectoryDataWriter(description, taskAttemptContext,
committer)
} else {
- new DynamicPartitionWriteTask(description, taskAttemptContext,
committer)
+ new DynamicPartitionDataWriter(description, taskAttemptContext,
committer)
}
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
- val summary = writeTask.execute(iterator)
- writeTask.releaseResources()
- WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
- })(catchBlock = {
- // If there is an error, release resource and then abort the task
- try {
- writeTask.releaseResources()
- } finally {
- committer.abortTask(taskAttemptContext)
- logError(s"Job $jobId aborted.")
+ for (row <- iterator) {
+ writeTask.write(row)
--- End diff --
nit: for this kind of simple loop, we can just write
`iterator.foreach(writeTask.write)`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]