This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c934ff2a42fe256c2a1174788cbe55c05e8e323
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Sun Mar 15 14:26:05 2020 +0300

    [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user 
code
    
    Improve error message, explaining the possible reasons and ways to resolve.
    In case of metaspace OOM error, try a graceful TM shutdown.
    
    This closes #11408.
---
 .../java/org/apache/flink/util/ExceptionUtils.java | 52 ++++++++++++++++++++++
 .../runtime/taskexecutor/TaskManagerRunner.java    |  8 +++-
 .../org/apache/flink/runtime/taskmanager/Task.java |  2 +
 3 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index ddd0276..5fc1bfe 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -25,6 +25,7 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import javax.annotation.Nullable;
@@ -48,6 +49,14 @@ public final class ExceptionUtils {
        /** The stringified representation of a null exception reference. */
        public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
+       private static final String TM_METASPACE_OOM_ERROR_MESSAGE = 
String.format(
+               "Metaspace. The metaspace out-of-memory error has occurred. 
This can mean two things: either the job requires " +
+                       "a larger size of JVM metaspace to load classes or 
there is a class loading leak. In the first case " +
+                       "'%s' configuration option should be increased. If the 
error persists (usually in cluster after " +
+                       "several job (re-)submissions) then there is probably a 
class loading leak which has to be " +
+                       "investigated and fixed. The task executor has to be 
shutdown...",
+               TaskManagerOptions.JVM_METASPACE.key());
+
        /**
         * Makes a string representation of the exception's stack trace, or 
"(null)", if the
         * exception is null.
@@ -110,6 +119,49 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Generates new {@link OutOfMemoryError} with more detailed message.
+        *
+        * <p>This method improves error message for metaspace {@link 
OutOfMemoryError}.
+        * It adds description of possible causes and ways of resolution.
+        *
+        * @param exception The exception to enrich.
+        * @return either enriched exception if needed or the original one.
+        */
+       public static Throwable enrichTaskManagerOutOfMemoryError(Throwable 
exception) {
+               if (isMetaspaceOutOfMemoryError(exception)) {
+                       return changeOutOfMemoryErrorMessage(exception, 
TM_METASPACE_OOM_ERROR_MESSAGE);
+               }
+               return exception;
+       }
+
+       private static OutOfMemoryError changeOutOfMemoryErrorMessage(Throwable 
exception, String newMessage) {
+               Preconditions.checkArgument(exception instanceof 
OutOfMemoryError);
+               if (exception.getMessage().equals(newMessage)) {
+                       return (OutOfMemoryError) exception;
+               }
+               OutOfMemoryError newError = new OutOfMemoryError(newMessage);
+               newError.initCause(exception.getCause());
+               newError.setStackTrace(exception.getStackTrace());
+               return newError;
+       }
+
+       /**
+        * Checks whether the given exception indicates a JVM metaspace 
out-of-memory error.
+        *
+        * @param t The exception to check.
+        * @return True, if the exception is the metaspace {@link 
OutOfMemoryError}, false otherwise.
+        */
+       public static boolean isMetaspaceOutOfMemoryError(Throwable t) {
+               return isOutOfMemoryErrorWithMessageStartingWith(t, 
"Metaspace");
+       }
+
+       private static boolean 
isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) {
+               // the exact matching of the class is checked to avoid matching 
any custom subclasses of OutOfMemoryError
+               // as we are interested in the original exceptions, generated 
by JVM.
+               return t.getClass() == OutOfMemoryError.class && t.getMessage() 
!= null && t.getMessage().startsWith(prefix);
+       }
+
+       /**
         * Rethrows the given {@code Throwable}, if it represents an error that 
is fatal to the JVM.
         * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a 
definition of fatal errors.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index ca93e32..8ed4fe4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -246,9 +246,13 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
        @Override
        public void onFatalError(Throwable exception) {
-               LOG.error("Fatal error occurred while executing the 
TaskManager. Shutting it down...", exception);
+               Throwable enrichedException = 
ExceptionUtils.enrichTaskManagerOutOfMemoryError(exception);
+               LOG.error("Fatal error occurred while executing the 
TaskManager. Shutting it down...", enrichedException);
 
-               if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
+               // In case of the Metaspace OutOfMemoryError, we expect that 
the graceful shutdown is possible,
+               // as it does not usually require more class loading to fail 
again with the Metaspace OutOfMemoryError.
+               if 
(ExceptionUtils.isJvmFatalOrOutOfMemoryError(enrichedException) &&
+                               
!ExceptionUtils.isMetaspaceOutOfMemoryError(enrichedException)) {
                        terminateJVM();
                } else {
                        closeAsync();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3723f1a..689ada8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -741,6 +741,8 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
                        // an exception was thrown as a side effect of 
cancelling
                        // 
----------------------------------------------------------------
 
+                       t = ExceptionUtils.enrichTaskManagerOutOfMemoryError(t);
+
                        try {
                                // check if the exception is unrecoverable
                                if (ExceptionUtils.isJvmFatalError(t) ||

Reply via email to