[FLINK-5553] [network] keep the original throwable in 
PartitionRequestClientHandler

This way, when checking for a previous error in any input channel, we can throw
a meaningful exception instead of the inspecific
IllegalStateException("There has been an error in the channel.") before.

Note that the original throwable (from an existing channel) may or may not(!)
have been printed by the InputGate yet. Any new input channel, however, did not
get the Throwable and must fail through the (now enhanced) fallback mechanism.

This closes #3299


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af81bebd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af81bebd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af81bebd

Branch: refs/heads/master
Commit: af81bebd0fabc6390930689df131e72edab6995b
Parents: a91b6ff
Author: Nico Kruber <n...@data-artisans.com>
Authored: Mon Feb 13 16:30:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 .../netty/PartitionRequestClientHandler.java    | 27 +++++++++++++++-----
 .../netty/ClientTransportErrorHandlingTest.java |  3 ++-
 2 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 52775d4..9f80abc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -42,18 +42,15 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestClientHandler.class);
 
        private final ConcurrentMap<InputChannelID, RemoteInputChannel> 
inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
 
-       private final AtomicBoolean channelError = new AtomicBoolean(false);
+       private final AtomicReference<Throwable> channelError = new 
AtomicReference<Throwable>();
 
        private final BufferListenerTask bufferListener = new 
BufferListenerTask();
 
@@ -73,8 +70,8 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
        // Input channel/receiver registration
        // 
------------------------------------------------------------------------
 
-       void addInputChannel(RemoteInputChannel listener) {
-               checkState(!channelError.get(), "There has been an error in the 
channel.");
+       void addInputChannel(RemoteInputChannel listener) throws IOException {
+               checkError();
 
                if (!inputChannels.containsKey(listener.getInputChannelId())) {
                        inputChannels.put(listener.getInputChannelId(), 
listener);
@@ -172,7 +169,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
        }
 
        private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
-               if (channelError.compareAndSet(false, true)) {
+               if (channelError.compareAndSet(null, cause)) {
                        try {
                                for (RemoteInputChannel inputChannel : 
inputChannels.values()) {
                                        inputChannel.onError(cause);
@@ -195,6 +192,22 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
 
        // 
------------------------------------------------------------------------
 
+       /**
+        * Checks for an error and rethrows it if one was reported.
+        */
+       private void checkError() throws IOException {
+               final Throwable t = channelError.get();
+
+               if (t != null) {
+                       if (t instanceof IOException) {
+                               throw (IOException) t;
+                       }
+                       else {
+                               throw new IOException("There has been an error 
in the channel.", t);
+                       }
+               }
+       }
+
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
                super.channelReadComplete(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/af81bebd/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index ab96d4a..22e7754 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -389,7 +389,8 @@ public class ClientTransportErrorHandlingTest {
                return new EmbeddedChannel(protocol.getClientChannelHandlers());
        }
 
-       private RemoteInputChannel 
addInputChannel(PartitionRequestClientHandler clientHandler) {
+       private RemoteInputChannel 
addInputChannel(PartitionRequestClientHandler clientHandler)
+               throws IOException {
                RemoteInputChannel rich = createRemoteInputChannel();
                clientHandler.addInputChannel(rich);
 

Reply via email to