There's a super rough guide in the wiki: https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

The gist of it is that you first want to verify that a ChildFirstClassLoader is being leaked (i.e., run a few jobs, cancel them, trigger garbage collection, get heap dump, check that ChildFirstClassLoaders are still there), then investigate which GC roots (aka threads) are referencing that classloader, then figure out where those threads came from.

On 24/08/2021 15:43, Kevin Lam wrote:
Thank you for pulling in Chesnay.

I haven't been able to confirm the issue doesn't happen yet, as I've found it difficult to reproduce easily. I did have follow-up questions:

1/ If Kafka metrics are indeed the cause of the leak, is there a workaround? We'd be interested in having these metrics available for monitoring and alerting purposes.

2/ Do you have any tips on identifying/confirming where the leak is coming from?



On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise <ar...@apache.org <mailto:ar...@apache.org>> wrote:

    Hi Kevin,

    The metrics are exposed similarly, so I expect the same issues as
    they come from Kafka's Consumer API itself.

    I'll pull in @Chesnay Schepler <mailto:ches...@apache.org> who
    afaik debugged the leak a while ago.

    On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <kevin....@shopify.com
    <mailto:kevin....@shopify.com>> wrote:

        Actually, we are using the `FlinkKafkaConsumer` [0] rather
        than `KafkaSource`. Is there a way to disable the consumer
        metrics using `FlinkKafkaConsumer`? Do you expect that to have
        the same Metaspace issue?


        [0]
        
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
        
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>

        On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam
        <kevin....@shopify.com <mailto:kevin....@shopify.com>> wrote:

            Thanks Arvid! I will give this a try and report back.

            On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise
            <ar...@apache.org <mailto:ar...@apache.org>> wrote:

                Hi Kevin,

                "java.lang.OutOfMemoryError: Metaspace" indicates that
                too many classes have been loaded. [1]
                If you only see that after a while, it's indicating
                that there is a classloader leak. I suspect that this
                is because of Kafka metrics. There have been some
                reports in the past.
                You can try to see what happens when you disable the
                forwarding of the Kafka metrics with
                |register.consumer.metrics: false [2].|

                [1]
                
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
                
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code>
                [2]
                
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
                
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties>

                On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam
                <kevin....@shopify.com <mailto:kevin....@shopify.com>>
                wrote:

                    Hi all,

                    I'm observing an issue sometimes, and it's been
                    hard to reproduce, where task managers are not
                    able to register with the Flink cluster. We
                    provision only the number of task managers
                    required to run a given application, and so the
                    absence of any of the task managers causes the job
                    to enter a crash loop where it fails to get the
                    required task slots.

                    The failure occurs after a job has been running
                    for a while, and when there have been job and task
                    manager restarts. We run in kubernetes so pod
                    disruptions occur from time to time, however we're
                    running using the high availability setup [0]

                    Has anyone encountered this before? Any suggestions?

                    Below are some error messages pulled from the task
                    managers failing to re-register.

                    ```
                    ] - Starting DefaultLeaderRetrievalService with
                    
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
                    2021-08-16 13:15:10,112 INFO
                     
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService
                    [] - Starting DefaultLeaderElectionService with
                    
KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
                    2021-08-16 13:15:10,205 INFO
                     
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
                    [] - New leader elected
                    7f504af8-5f00-494e-8ecb-c3e67688bae8 for
                    streaming-sales-model-staging-restserver-leader.
                    2021-08-16 13:15:10,205 INFO
                     
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
                    [] - New leader elected
                    7f504af8-5f00-494e-8ecb-c3e67688bae8 for
                    streaming-sales-model-staging-resourcemanager-leader.
                    2021-08-16 13:15:10,205 INFO
                     
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
                    [] - New leader elected
                    7f504af8-5f00-494e-8ecb-c3e67688bae8 for
                    streaming-sales-model-staging-dispatcher-leader.
                    2021-08-16 13:15:10,211 INFO
                     
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService
                    [] - Starting DefaultLeaderRetrievalService with
                    
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
                    2021-08-16 13:16:26,103 WARN
                     
org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever
                    [] - Error while retrieving the leader gateway.
                    Retrying to connect to
                    
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
                    2021-08-16 13:16:30,978 WARN
                     
org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever
                    [] - Error while retrieving the leader gateway.
                    Retrying to connect to
                    
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
                    ```

                    ```
                    2021-08-15 14:02:21,078 ERROR
                    org.apache.kafka.common.utils.KafkaThread        
                               [] - Uncaught exception in thread
                    'kafka-producer-network-thread |
                    
trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
                    java.lang.NoClassDefFoundError:
                    org/apache/kafka/clients/NetworkClient$1
                            at
                    
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
                    ~[?:?]
                            at
                    
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
                    ~[?:?]
                            at
                    
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
                    ~[?:?]
                            at
                    
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
                    ~[?:?]
                            at
                    
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
                    ~[?:?]
                            at java.lang.Thread.run(Unknown Source) [?:?]
                    Caused by: java.lang.ClassNotFoundException:
                    org.apache.kafka.clients.NetworkClient$1
                            at
                    java.net.URLClassLoader.findClass(Unknown Source)
                    ~[?:?]
                            at java.lang.ClassLoader.loadClass(Unknown
                    Source) ~[?:?]
                            at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
                    
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
                            at
                    
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
                    
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
                            at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                    
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
                            at java.lang.ClassLoader.loadClass(Unknown
                    Source) ~[?:?]
                            ... 6 more
                    ```

                    ```
                    connection to [null] failed with
                    java.net.ConnectException: Connection refused:
                    flink-jobmanager/10.28.65.100:6123
                    <http://10.28.65.100:6123>
                    2021-08-16 13:14:59,668 WARN
                     akka.remote.ReliableDeliverySupervisor          
                              [] - Association with remote system
                    [akka.tcp://flink@flink-jobmanager:6123] has
                    failed, address is now gated for [50] ms. Reason:
                    [Association failed with
                    [akka.tcp://flink@flink-jobmanager:6123]] Caused
                    by: [java.net.ConnectException: Connection
                    refused: flink-jobmanager/10.28.65.100:6123
                    <http://10.28.65.100:6123>]
                    2021-08-16 13:14:59,669 INFO
                     org.apache.flink.runtime.taskexecutor.TaskExecutor
                              [] - Could not resolve ResourceManager
                    address
                    
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
                    retrying in 10000 ms: Could not connect to rpc
                    endpoint under address
                    
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
                    ```

                    ```
                    2021-08-15 16:55:13,222 ERROR
                    
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
                    [] - Failed to submit a listener notification
                    task. Event loop shut down?
                    java.lang.OutOfMemoryError: Metaspace. The
                    metaspace out-of-memory error has occurred. This
                    can mean two things: either the job requires a
                    larger size of JVM metaspace to load classes or
                    there is a class loading leak. In the first case
                    'taskmanager.memory.jvm-metaspace.size'
                    configuration option should be increased. If the
                    error persists (usually in cluster after several
                    job (re-)submissions) then there is probably a
                    class loading leak in user code or some of its
                    dependencies which has to be investigated and
                    fixed. The task executor has to be shutdown...
                    ```


                    [0]
                    
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
                    
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes>


Reply via email to