Github user ssimeonov commented on a diff in the pull request:
https://github.com/apache/spark/pull/16204#discussion_r91437338
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
---
@@ -225,32 +228,50 @@ object FileFormatWriter extends Logging {
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol) extends ExecuteWriteTask {
- private[this] var outputWriter: OutputWriter = {
+ private[this] var currentWriter: OutputWriter = _
--- End diff --
Looking through the code, three things stand out:
1. There is code duplication between `SingleDirectoryWriteTask` and
`DynamicPartitionWriteTask` when it comes to current writer management and
cleanup.
2. There is duplication within `releaseResources()` and `newOutputWriter()`
of the write tasks when it comes to releasing resources.
3. Write task state management is leaky because `releaseResources()` is
called explicitly by `executeTask()`. Also, `releaseResources()` will be called
twice when there are no exceptions and once if there is an exception in
`execute()`, which is a bit confusing.
What about asking the base trait to do a bit more work and present a
stronger contract to its users, e.g.:
```scala
private trait ExecuteWriteTask {
protected[this] var currentWriter: OutputWriter = null
def execute(iterator: Iterator[InternalRow]): Set[String] = {
try {
executeImp(iterator)
} finally {
releaseResources()
}
}
/**
* Writes data out to files, and then returns the list of partition
strings written out.
* The list of partitions is sent back to the driver and used to update
the catalog.
*/
protected def executeImp(iterator: Iterator[InternalRow]): Set[String]
protected def resetCurrentWriter(): Unit = {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
}
protected def releaseResources(): Unit = {
resetCurrentWriter()
}
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]