This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 9ee42ec  [FLINK-14076] Ensure CheckpointException can be deserialized 
on JobManager
9ee42ec is described below

commit 9ee42ec6b009f1c4da7265851e23cfaa45f4c83a
Author: Jeff Martin <jmar...@palantir.com>
AuthorDate: Sun Sep 22 21:27:12 2019 -0700

    [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager
    
    This closes #9742.
---
 .../cassandra/CassandraSinkBaseTest.java           |  5 +++-
 .../kafka/FlinkKafkaProducer011ITCase.java         |  7 +++--
 .../connectors/kafka/FlinkKafkaProducerITCase.java |  6 ++--
 .../java/org/apache/flink/util/ExceptionUtils.java | 32 ++++++++++++++++++++++
 .../runtime/checkpoint/CheckpointException.java    | 13 +++++++--
 .../sink/TwoPhaseCommitSinkFunctionTest.java       |  3 +-
 .../api/operators/AbstractStreamOperatorTest.java  |  4 +--
 .../apache/flink/streaming/util/ContentDump.java   |  2 +-
 8 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index b4406ab..3ce9742 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -37,11 +37,13 @@ import org.junit.Test;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -156,7 +158,8 @@ public class CassandraSinkBaseTest {
 
                                Assert.fail();
                        } catch (Exception e) {
-                               Assert.assertTrue(e.getCause() instanceof 
IOException);
+                               Optional<IOException> exCause = 
findSerializedThrowable(e, IOException.class, 
ClassLoader.getSystemClassLoader());
+                               Assert.assertTrue(exCause.isPresent());
                        }
                }
        }
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 0932d42..b2d0a96 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -49,7 +49,7 @@ import java.util.stream.IntStream;
 import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
 import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE;
 import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.EXACTLY_ONCE;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
@@ -160,7 +160,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                }
                catch (Exception ex) {
                        // testHarness1 will be fenced off after creating and 
closing testHarness2
-                       if (!findThrowable(ex, 
ProducerFencedException.class).isPresent()) {
+                       if (!findSerializedThrowable(ex, 
ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) 
{
                                throw ex;
                        }
                }
@@ -664,7 +664,8 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
        }
 
        private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, 
Throwable ex) {
-               Optional<FlinkKafka011Exception> cause = findThrowable(ex, 
FlinkKafka011Exception.class);
+               // Extract the root cause kafka exception (if any) from the 
serialized throwable.
+               Optional<FlinkKafka011Exception> cause = 
findSerializedThrowable(ex, FlinkKafka011Exception.class, 
ClassLoader.getSystemClassLoader());
                if (cause.isPresent()) {
                        return 
cause.get().getErrorCode().equals(expectedErrorCode);
                }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 1097fd6..d393819 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -45,7 +45,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
@@ -156,7 +156,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                }
                catch (Exception ex) {
                        // testHarness1 will be fenced off after creating and 
closing testHarness2
-                       if (!findThrowable(ex, 
ProducerFencedException.class).isPresent()) {
+                       if (!findSerializedThrowable(ex, 
ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) 
{
                                throw ex;
                        }
                }
@@ -662,7 +662,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
        }
 
        private boolean isCausedBy(FlinkKafkaErrorCode expectedErrorCode, 
Throwable ex) {
-               Optional<FlinkKafkaException> cause = findThrowable(ex, 
FlinkKafkaException.class);
+               Optional<FlinkKafkaException> cause = 
findSerializedThrowable(ex, FlinkKafkaException.class, 
ClassLoader.getSystemClassLoader());
                if (cause.isPresent()) {
                        return 
cause.get().getErrorCode().equals(expectedErrorCode);
                }
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 721bf7f..ddd0276 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -315,6 +315,38 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Checks whether a throwable chain contains a specific type of 
exception and returns it. It deserializes
+        * any {@link SerializedThrowable} that are found using the provided 
{@link ClassLoader}.
+        *
+        * @param throwable the throwable chain to check.
+        * @param searchType the type of exception to search for in the chain.
+        * @param classLoader to use for deserialization.
+        * @return Optional throwable of the requested type if available, 
otherwise empty
+        */
+       public static <T extends Throwable> Optional<T> 
findSerializedThrowable(Throwable throwable, Class<T> searchType, ClassLoader 
classLoader) {
+               if (throwable == null || searchType == null) {
+                       return Optional.empty();
+               }
+
+               Throwable t = throwable;
+               while (t != null) {
+                       if (searchType.isAssignableFrom(t.getClass())) {
+                               return Optional.of(searchType.cast(t));
+                       } else if 
(t.getClass().isAssignableFrom(SerializedThrowable.class)) {
+                               Throwable next = ((SerializedThrowable) 
t).deserializeError(classLoader);
+                               // SerializedThrowable#deserializeError returns 
itself under some conditions (e.g., null cause).
+                               // If that happens, exit to avoid looping 
infinitely. This is ok because if the user was searching
+                               // for a SerializedThrowable, we would have 
returned it in the initial if condition.
+                               t = (next == t) ? null : next;
+                       } else {
+                               t = t.getCause();
+                       }
+               }
+
+               return Optional.empty();
+       }
+
+       /**
         * Checks whether a throwable chain contains a specific type of 
exception and returns it.
         *
         * @param throwable the throwable chain to check.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
index c0bc2d1..7c8ab49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
 /**
  * Base class for checkpoint related exceptions.
@@ -40,12 +41,20 @@ public class CheckpointException extends Exception {
        }
 
        public CheckpointException(CheckpointFailureReason failureReason, 
Throwable cause) {
-               super(failureReason.message(), cause);
+               // Defensively replace the cause with a SerializedThrowable in 
case it's a user-defined exception
+               // that doesn't exist on the JobManager's default classpath.
+               super(
+                       failureReason.message(),
+                       cause == null ? null : new SerializedThrowable(cause));
                this.checkpointFailureReason = 
Preconditions.checkNotNull(failureReason);
        }
 
        public CheckpointException(String message, CheckpointFailureReason 
failureReason, Throwable cause) {
-               super(message + " Failure reason: " + failureReason.message(), 
cause);
+               // Defensively replace the cause with a SerializedThrowable in 
case it's a user-defined exception
+               // that doesn't exist on the JobManager's default classpath.
+               super(
+                       message + " Failure reason: " + failureReason.message(),
+                       cause == null ? null : new SerializedThrowable(cause));
                this.checkpointFailureReason = 
Preconditions.checkNotNull(failureReason);
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 2970b87..84c0104 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -47,6 +47,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
@@ -167,7 +168,7 @@ public class TwoPhaseCommitSinkFunctionTest {
                        harness.snapshot(2, 5);
                        fail("something should fail");
                } catch (Exception ex) {
-                       if (!(ex.getCause() instanceof 
ContentDump.NotWritableException)) {
+                       if (!findSerializedThrowable(ex, 
ContentDump.NotWritableException.class, 
ClassLoader.getSystemClassLoader()).isPresent()) {
                                throw ex;
                        }
                        // ignore
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 36fb867..f9d9aa5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -558,7 +558,7 @@ public class AbstractStreamOperatorTest {
                                        new 
MemCheckpointStreamFactory(Integer.MAX_VALUE));
                        fail("Exception expected.");
                } catch (Exception e) {
-                       assertEquals(failingException, e.getCause());
+                       assertEquals(failingException.getMessage(), 
e.getCause().getMessage());
                }
        }
 
@@ -636,7 +636,7 @@ public class AbstractStreamOperatorTest {
                                        new 
MemCheckpointStreamFactory(Integer.MAX_VALUE));
                        fail("Exception expected.");
                } catch (Exception e) {
-                       assertEquals(failingException, e.getCause());
+                       assertEquals(failingException.getMessage(), 
e.getCause().getMessage());
                }
 
                // verify that the context has been closed, the operator 
snapshot result has been cancelled
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
index 903b237..5c1568e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
@@ -124,7 +124,7 @@ public class ContentDump {
        /**
         * Exception thrown for an attempt to write into read-only {@link 
ContentDump}.
         */
-       public class NotWritableException extends RuntimeException {
+       public static class NotWritableException extends RuntimeException {
                public NotWritableException(String name) {
                        super(String.format("File [%s] is not writable", name));
                }

Reply via email to