This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 9ee42ec [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager 9ee42ec is described below commit 9ee42ec6b009f1c4da7265851e23cfaa45f4c83a Author: Jeff Martin <jmar...@palantir.com> AuthorDate: Sun Sep 22 21:27:12 2019 -0700 [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager This closes #9742. --- .../cassandra/CassandraSinkBaseTest.java | 5 +++- .../kafka/FlinkKafkaProducer011ITCase.java | 7 +++-- .../connectors/kafka/FlinkKafkaProducerITCase.java | 6 ++-- .../java/org/apache/flink/util/ExceptionUtils.java | 32 ++++++++++++++++++++++ .../runtime/checkpoint/CheckpointException.java | 13 +++++++-- .../sink/TwoPhaseCommitSinkFunctionTest.java | 3 +- .../api/operators/AbstractStreamOperatorTest.java | 4 +-- .../apache/flink/streaming/util/ContentDump.java | 2 +- 8 files changed, 59 insertions(+), 13 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java index b4406ab..3ce9742 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java @@ -37,11 +37,13 @@ import org.junit.Test; import java.io.IOException; import java.time.Duration; import java.util.LinkedList; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable; import static org.hamcrest.number.OrderingComparison.greaterThan; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.when; @@ -156,7 +158,8 @@ public class CassandraSinkBaseTest { Assert.fail(); } catch (Exception e) { - Assert.assertTrue(e.getCause() instanceof IOException); + Optional<IOException> exCause = findSerializedThrowable(e, IOException.class, ClassLoader.getSystemClassLoader()); + Assert.assertTrue(exCause.isPresent()); } } } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 0932d42..b2d0a96 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -49,7 +49,7 @@ import java.util.stream.IntStream; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.EXACTLY_ONCE; -import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertThat; @@ -160,7 +160,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBaseWithFlink { } catch (Exception ex) { // testHarness1 will be fenced off after creating and closing testHarness2 - if (!findThrowable(ex, ProducerFencedException.class).isPresent()) { + if (!findSerializedThrowable(ex, ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) { throw ex; } } @@ -664,7 +664,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBaseWithFlink { } private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) { - Optional<FlinkKafka011Exception> cause = findThrowable(ex, FlinkKafka011Exception.class); + // Extract the root cause kafka exception (if any) from the serialized throwable. + Optional<FlinkKafka011Exception> cause = findSerializedThrowable(ex, FlinkKafka011Exception.class, ClassLoader.getSystemClassLoader()); if (cause.isPresent()) { return cause.get().getErrorCode().equals(expectedErrorCode); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java index 1097fd6..d393819 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java @@ -45,7 +45,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertThat; @@ -156,7 +156,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { } catch (Exception ex) { // testHarness1 will be fenced off after creating and closing testHarness2 - if (!findThrowable(ex, ProducerFencedException.class).isPresent()) { + if (!findSerializedThrowable(ex, ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) { throw ex; } } @@ -662,7 +662,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { } private boolean isCausedBy(FlinkKafkaErrorCode expectedErrorCode, Throwable ex) { - Optional<FlinkKafkaException> cause = findThrowable(ex, FlinkKafkaException.class); + Optional<FlinkKafkaException> cause = findSerializedThrowable(ex, FlinkKafkaException.class, ClassLoader.getSystemClassLoader()); if (cause.isPresent()) { return cause.get().getErrorCode().equals(expectedErrorCode); } diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 721bf7f..ddd0276 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -315,6 +315,38 @@ public final class ExceptionUtils { } /** + * Checks whether a throwable chain contains a specific type of exception and returns it. It deserializes + * any {@link SerializedThrowable} that are found using the provided {@link ClassLoader}. + * + * @param throwable the throwable chain to check. + * @param searchType the type of exception to search for in the chain. + * @param classLoader to use for deserialization. + * @return Optional throwable of the requested type if available, otherwise empty + */ + public static <T extends Throwable> Optional<T> findSerializedThrowable(Throwable throwable, Class<T> searchType, ClassLoader classLoader) { + if (throwable == null || searchType == null) { + return Optional.empty(); + } + + Throwable t = throwable; + while (t != null) { + if (searchType.isAssignableFrom(t.getClass())) { + return Optional.of(searchType.cast(t)); + } else if (t.getClass().isAssignableFrom(SerializedThrowable.class)) { + Throwable next = ((SerializedThrowable) t).deserializeError(classLoader); + // SerializedThrowable#deserializeError returns itself under some conditions (e.g., null cause). + // If that happens, exit to avoid looping infinitely. This is ok because if the user was searching + // for a SerializedThrowable, we would have returned it in the initial if condition. + t = (next == t) ? null : next; + } else { + t = t.getCause(); + } + } + + return Optional.empty(); + } + + /** * Checks whether a throwable chain contains a specific type of exception and returns it. * * @param throwable the throwable chain to check. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java index c0bc2d1..7c8ab49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedThrowable; /** * Base class for checkpoint related exceptions. @@ -40,12 +41,20 @@ public class CheckpointException extends Exception { } public CheckpointException(CheckpointFailureReason failureReason, Throwable cause) { - super(failureReason.message(), cause); + // Defensively replace the cause with a SerializedThrowable in case it's a user-defined exception + // that doesn't exist on the JobManager's default classpath. + super( + failureReason.message(), + cause == null ? null : new SerializedThrowable(cause)); this.checkpointFailureReason = Preconditions.checkNotNull(failureReason); } public CheckpointException(String message, CheckpointFailureReason failureReason, Throwable cause) { - super(message + " Failure reason: " + failureReason.message(), cause); + // Defensively replace the cause with a SerializedThrowable in case it's a user-defined exception + // that doesn't exist on the JobManager's default classpath. + super( + message + " Failure reason: " + failureReason.message(), + cause == null ? null : new SerializedThrowable(cause)); this.checkpointFailureReason = Preconditions.checkNotNull(failureReason); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 2970b87..84c0104 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -47,6 +47,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; @@ -167,7 +168,7 @@ public class TwoPhaseCommitSinkFunctionTest { harness.snapshot(2, 5); fail("something should fail"); } catch (Exception ex) { - if (!(ex.getCause() instanceof ContentDump.NotWritableException)) { + if (!findSerializedThrowable(ex, ContentDump.NotWritableException.class, ClassLoader.getSystemClassLoader()).isPresent()) { throw ex; } // ignore diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 36fb867..f9d9aa5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -558,7 +558,7 @@ public class AbstractStreamOperatorTest { new MemCheckpointStreamFactory(Integer.MAX_VALUE)); fail("Exception expected."); } catch (Exception e) { - assertEquals(failingException, e.getCause()); + assertEquals(failingException.getMessage(), e.getCause().getMessage()); } } @@ -636,7 +636,7 @@ public class AbstractStreamOperatorTest { new MemCheckpointStreamFactory(Integer.MAX_VALUE)); fail("Exception expected."); } catch (Exception e) { - assertEquals(failingException, e.getCause()); + assertEquals(failingException.getMessage(), e.getCause().getMessage()); } // verify that the context has been closed, the operator snapshot result has been cancelled diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java index 903b237..5c1568e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java @@ -124,7 +124,7 @@ public class ContentDump { /** * Exception thrown for an attempt to write into read-only {@link ContentDump}. */ - public class NotWritableException extends RuntimeException { + public static class NotWritableException extends RuntimeException { public NotWritableException(String name) { super(String.format("File [%s] is not writable", name)); }