Thank you for pulling in Chesnay.

I haven't been able to confirm the issue doesn't happen yet, as I've found
it difficult to reproduce easily. I did have follow-up questions:

1/ If Kafka metrics are indeed the cause of the leak, is there a
workaround? We'd be interested in having these metrics available for
monitoring and alerting purposes.

2/ Do you have any tips on identifying/confirming where the leak is coming
from?



On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Kevin,
>
> The metrics are exposed similarly, so I expect the same issues as they
> come from Kafka's Consumer API itself.
>
> I'll pull in @Chesnay Schepler <ches...@apache.org> who afaik debugged
> the leak a while ago.
>
> On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
>> `KafkaSource`. Is there a way to disable the consumer metrics using
>> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>>
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>>
>> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <kevin....@shopify.com> wrote:
>>
>>> Thanks Arvid! I will give this a try and report back.
>>>
>>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>>>> have been loaded. [1]
>>>> If you only see that after a while, it's indicating that there is a
>>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>>> have been some reports in the past.
>>>> You can try to see what happens when you disable the forwarding of the
>>>> Kafka metrics with register.consumer.metrics: false [2].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>>
>>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <kevin....@shopify.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm observing an issue sometimes, and it's been hard to reproduce,
>>>>> where task managers are not able to register with the Flink cluster. We
>>>>> provision only the number of task managers required to run a given
>>>>> application, and so the absence of any of the task managers causes the job
>>>>> to enter a crash loop where it fails to get the required task slots.
>>>>>
>>>>> The failure occurs after a job has been running for a while, and when
>>>>> there have been job and task manager restarts. We run in kubernetes so pod
>>>>> disruptions occur from time to time, however we're running using the high
>>>>> availability setup [0]
>>>>>
>>>>> Has anyone encountered this before? Any suggestions?
>>>>>
>>>>> Below are some error messages pulled from the task managers failing to
>>>>> re-register.
>>>>>
>>>>> ```
>>>>> ] - Starting DefaultLeaderRetrievalService with
>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>> 2021-08-16 13:15:10,112 INFO
>>>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>>>> Starting DefaultLeaderElectionService with
>>>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-restserver-leader.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-resourcemanager-leader.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-dispatcher-leader.
>>>>> 2021-08-16 13:15:10,211 INFO
>>>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>>>> - Starting DefaultLeaderRetrievalService with
>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>>>> 2021-08-16 13:16:26,103 WARN
>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>> 2021-08-16 13:16:30,978 WARN
>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>> ```
>>>>>
>>>>> ```
>>>>> 2021-08-15 14:02:21,078 ERROR
>>>>> org.apache.kafka.common.utils.KafkaThread                    [] - Uncaught
>>>>> exception in thread 'kafka-producer-network-thread |
>>>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>>>> java.lang.NoClassDefFoundError:
>>>>> org/apache/kafka/clients/NetworkClient$1
>>>>>         at
>>>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>>>> ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>>>> ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>>>> ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>>> ~[?:?]
>>>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.kafka.clients.NetworkClient$1
>>>>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>         at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>         at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>         at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>         ... 6 more
>>>>> ```
>>>>>
>>>>> ```
>>>>> connection to [null] failed with java.net.ConnectException: Connection
>>>>> refused: flink-jobmanager/10.28.65.100:6123
>>>>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>                     [] - Association with remote system
>>>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>>>> gated for [50] ms. Reason: [Association failed with
>>>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>>>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>>>>> 10.28.65.100:6123]
>>>>> 2021-08-16 13:14:59,669 INFO
>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>>>>> not resolve ResourceManager address 
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>>>>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>>>>> ```
>>>>>
>>>>> ```
>>>>> 2021-08-15 16:55:13,222 ERROR
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>> [] - Failed to submit a listener notification task. Event loop shut down?
>>>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory
>>>>> error has occurred. This can mean two things: either the job requires a
>>>>> larger size of JVM metaspace to load classes or there is a class loading
>>>>> leak. In the first case 'taskmanager.memory.jvm-metaspace.size'
>>>>> configuration option should be increased. If the error persists (usually 
>>>>> in
>>>>> cluster after several job (re-)submissions) then there is probably a class
>>>>> loading leak in user code or some of its dependencies which has to be
>>>>> investigated and fixed. The task executor has to be shutdown...
>>>>> ```
>>>>>
>>>>>
>>>>> [0]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>>>>
>>>>

Reply via email to