[hotfix][kafka-tests] Do not hide original exception in FlinkKafkaProducer011ITCase
This closes #5383. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02ea5087 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02ea5087 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02ea5087 Branch: refs/heads/master Commit: 02ea508736e06e08ff53921994b29ce8db0f5f50 Parents: 1c02e1a Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Tue Jan 30 14:56:43 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Feb 12 11:59:30 2018 +0100 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducer011ITCase.java | 35 ++++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02ea5087/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 3c3c86a..ab3a2e2 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -39,15 +39,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; /** * IT cases for the {@link FlinkKafkaProducer011}. @@ -98,7 +98,9 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { throw new IllegalStateException("This should not be reached."); } catch (Exception ex) { - assertIsCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex); + if (!isCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex)) { + throw ex; + } } // Resume transactions before testHarness1 is being closed (in case of failures close() might not be called) @@ -114,7 +116,9 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { } catch (Exception ex) { // testHarness1 will be fenced off after creating and closing testHarness2 - assertIsCausedBy(ProducerFencedException.class, ex); + if (!findThrowable(ex, ProducerFencedException.class).isPresent()) { + throw ex; + } } } @@ -592,24 +596,11 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { return properties; } - private void assertIsCausedBy(Class<?> clazz, Throwable ex) { - for (int depth = 0; depth < 50 && ex != null; depth++) { - if (clazz.isInstance(ex)) { - return; - } - ex = ex.getCause(); - } - fail(String.format("Exception [%s] was not caused by [%s]", ex, clazz)); - } - - private void assertIsCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) { - for (int depth = 0; depth < 50 && ex != null; depth++) { - if (ex instanceof FlinkKafka011Exception) { - assertEquals(expectedErrorCode, ((FlinkKafka011Exception) ex).getErrorCode()); - return; - } - ex = ex.getCause(); + private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) { + Optional<FlinkKafka011Exception> cause = findThrowable(ex, FlinkKafka011Exception.class); + if (cause.isPresent()) { + return cause.get().getErrorCode().equals(expectedErrorCode); } - fail(String.format("Exception [%s] was not caused by FlinkKafka011Exception[errorCode=%s]", ex, expectedErrorCode)); + return false; } }