[FLINK-5553] [network] keep the original throwable in PartitionRequestClientHandler
This way, when checking for a previous error in any input channel, we can throw a meaningful exception instead of the inspecific IllegalStateException("There has been an error in the channel.") before. Note that the original throwable (from an existing channel) may or may not(!) have been printed by the InputGate yet. Any new input channel, however, did not get the Throwable and must fail through the (now enhanced) fallback mechanism. This closes #3299 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af81bebd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af81bebd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af81bebd Branch: refs/heads/master Commit: af81bebd0fabc6390930689df131e72edab6995b Parents: a91b6ff Author: Nico Kruber <n...@data-artisans.com> Authored: Mon Feb 13 16:30:59 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 14 15:32:43 2017 +0100 ---------------------------------------------------------------------- .../netty/PartitionRequestClientHandler.java | 27 +++++++++++++++----- .../netty/ClientTransportErrorHandlingTest.java | 3 ++- 2 files changed, 22 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 52775d4..9f80abc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -42,18 +42,15 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flink.util.Preconditions.checkState; - class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class); private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>(); - private final AtomicBoolean channelError = new AtomicBoolean(false); + private final AtomicReference<Throwable> channelError = new AtomicReference<Throwable>(); private final BufferListenerTask bufferListener = new BufferListenerTask(); @@ -73,8 +70,8 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { // Input channel/receiver registration // ------------------------------------------------------------------------ - void addInputChannel(RemoteInputChannel listener) { - checkState(!channelError.get(), "There has been an error in the channel."); + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); if (!inputChannels.containsKey(listener.getInputChannelId())) { inputChannels.put(listener.getInputChannelId(), listener); @@ -172,7 +169,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { } private void notifyAllChannelsOfErrorAndClose(Throwable cause) { - if (channelError.compareAndSet(false, true)) { + if (channelError.compareAndSet(null, cause)) { try { for (RemoteInputChannel inputChannel : inputChannels.values()) { inputChannel.onError(cause); @@ -195,6 +192,22 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { // ------------------------------------------------------------------------ + /** + * Checks for an error and rethrows it if one was reported. + */ + private void checkError() throws IOException { + final Throwable t = channelError.get(); + + if (t != null) { + if (t instanceof IOException) { + throw (IOException) t; + } + else { + throw new IOException("There has been an error in the channel.", t); + } + } + } + @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index ab96d4a..22e7754 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -389,7 +389,8 @@ public class ClientTransportErrorHandlingTest { return new EmbeddedChannel(protocol.getClientChannelHandlers()); } - private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) { + private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) + throws IOException { RemoteInputChannel rich = createRemoteInputChannel(); clientHandler.addInputChannel(rich);