squito commented on a change in pull request #25342:
[SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the
SortShuffleWriter
URL: https://github.com/apache/spark/pull/25342#discussion_r318127279
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
##########
@@ -60,17 +60,43 @@ private[spark] class ShufflePartitionPairsWriter(
}
override def close(): Unit = {
+ var thrownException: Option[IOException] = None
if (objOut != null) {
- // Closing objOut should propagate close to all inner layers
- objOut.close()
+ thrownException = tryCloseOrAddSuppressed(objOut, thrownException)
+ objOut = null
}
- objOut = null
- wrappedStream = null
- partitionStream = null
+
+ if (wrappedStream != null) {
+ thrownException = tryCloseOrAddSuppressed(wrappedStream, thrownException)
+ wrappedStream = null
+ }
+
+ if (partitionStream != null) {
+ thrownException = tryCloseOrAddSuppressed(partitionStream,
thrownException)
+ partitionStream = null
+ }
+
+ thrownException.foreach(throw _)
+
isOpen = false
updateBytesWritten()
}
+ private def tryCloseOrAddSuppressed(
+ closeable: Closeable, prevException: Option[IOException]):
Option[IOException] = {
+ var resolvedException = prevException
+ try {
+ closeable.close()
+ } catch {
+ case e: IOException =>
+ resolvedException = prevException.map(presentPrev => {
+ presentPrev.addSuppressed(e)
+ presentPrev
+ }).orElse(Some(e))
+ }
+ resolvedException
Review comment:
I can't think of anything wrong here, but seems safer to be using `finally`.
kind of a stretch, but if some (badly implemented) stream throws a
RuntimeException instead of an IOException you wouldn't clean up properly this
way. The nesting gets a bit ugly, but you could do this:
```scala
def closeIfNonNull[T <: Closeable](x: T): T = {
if (x != null) x.close()
null.asInstanceOf[T]
}
Utils.tryWithSafeFinally {
objOut = closeIfNonNull(objOut)
} {
// normally closing objOut would close the inner streams as well, but just
in case there was
// an error in initialization etc. we make sure we clean the other streams
up too
Utils.tryWithSafeFinally {
wrappedStream = closeIfNonNull(wrappedStream)
} {
partitionStream = closeIfNonNull(partitionStream)
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]