Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r191935821
--- Diff:
common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
---
@@ -50,16 +52,22 @@
@Override
public void exceptionCaught(Throwable cause) throws Exception {
- handler.deactivateStream();
+ deactivateStream();
callback.onFailure(streamId, cause);
}
@Override
public void channelInactive() throws Exception {
- handler.deactivateStream();
+ deactivateStream();
callback.onFailure(streamId, new ClosedChannelException());
}
+ private void deactivateStream() {
+ if (handler instanceof TransportResponseHandler) {
--- End diff --
Why don't we need to do this for `TransportRequestHandler`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]