Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20179#discussion_r160312383
--- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
---
@@ -332,15 +332,17 @@ private[netty] class NettyRpcEnv(
val pipe = Pipe.open()
val source = new FileDownloadChannel(pipe.source())
+ var exceptionThrown = true
try {
val client = downloadClient(parsedUri.getHost(), parsedUri.getPort())
val callback = new FileDownloadCallback(pipe.sink(), source, client)
client.stream(parsedUri.getPath(), callback)
- } catch {
- case e: Exception =>
+ exceptionThrown = false
+ } finally {
+ if (exceptionThrown) {
pipe.sink().close()
--- End diff --
To preserve the same behavior, I think we'd need to do
```scala
try {
// code
} catch {
case t: Throwable =>
pipe.sink().close()
source.close()
throw t
}
```
to ensure that we propagate the original `Throwable`.
It could be clearer (and safer, in the case of exceptions thrown from
`close()` calls), to use
[`Utils.tryWithSafeFinallyAndFailureCallbacks`](https://github.com/apache/spark/blob/849043ce1d28a976659278d29368da0799329db8/core/src/main/scala/org/apache/spark/util/Utils.scala#L1407)
here, so let me make that change.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]