[hotfix][kafka-tests] Do not hide original exception in 
FlinkKafkaProducer011ITCase

This closes #5383.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02ea5087
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02ea5087
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02ea5087

Branch: refs/heads/master
Commit: 02ea508736e06e08ff53921994b29ce8db0f5f50
Parents: 1c02e1a
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Jan 30 14:56:43 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 12 11:59:30 2018 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011ITCase.java      | 35 ++++++++------------
 1 file changed, 13 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02ea5087/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 3c3c86a..ab3a2e2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -39,15 +39,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -98,7 +98,9 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                                throw new IllegalStateException("This should 
not be reached.");
                        }
                        catch (Exception ex) {
-                               
assertIsCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex);
+                               if 
(!isCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex)) {
+                                       throw ex;
+                               }
                        }
 
                        // Resume transactions before testHarness1 is being 
closed (in case of failures close() might not be called)
@@ -114,7 +116,9 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                }
                catch (Exception ex) {
                        // testHarness1 will be fenced off after creating and 
closing testHarness2
-                       assertIsCausedBy(ProducerFencedException.class, ex);
+                       if (!findThrowable(ex, 
ProducerFencedException.class).isPresent()) {
+                               throw ex;
+                       }
                }
        }
 
@@ -592,24 +596,11 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                return properties;
        }
 
-       private void assertIsCausedBy(Class<?> clazz, Throwable ex) {
-               for (int depth = 0; depth < 50 && ex != null; depth++) {
-                       if (clazz.isInstance(ex)) {
-                               return;
-                       }
-                       ex = ex.getCause();
-               }
-               fail(String.format("Exception [%s] was not caused by [%s]", ex, 
clazz));
-       }
-
-       private void assertIsCausedBy(FlinkKafka011ErrorCode expectedErrorCode, 
Throwable ex) {
-               for (int depth = 0; depth < 50 && ex != null; depth++) {
-                       if (ex instanceof FlinkKafka011Exception) {
-                               assertEquals(expectedErrorCode, 
((FlinkKafka011Exception) ex).getErrorCode());
-                               return;
-                       }
-                       ex = ex.getCause();
+       private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, 
Throwable ex) {
+               Optional<FlinkKafka011Exception> cause = findThrowable(ex, 
FlinkKafka011Exception.class);
+               if (cause.isPresent()) {
+                       return 
cause.get().getErrorCode().equals(expectedErrorCode);
                }
-               fail(String.format("Exception [%s] was not caused by 
FlinkKafka011Exception[errorCode=%s]", ex, expectedErrorCode));
+               return false;
        }
 }

Reply via email to