Hi Kevin, The metrics are exposed similarly, so I expect the same issues as they come from Kafka's Consumer API itself.
I'll pull in @Chesnay Schepler <ches...@apache.org> who afaik debugged the leak a while ago. On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <kevin....@shopify.com> wrote: > Actually, we are using the `FlinkKafkaConsumer` [0] rather than > `KafkaSource`. Is there a way to disable the consumer metrics using > `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue? > > > [0] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html > > On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <kevin....@shopify.com> wrote: > >> Thanks Arvid! I will give this a try and report back. >> >> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Kevin, >>> >>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes >>> have been loaded. [1] >>> If you only see that after a while, it's indicating that there is a >>> classloader leak. I suspect that this is because of Kafka metrics. There >>> have been some reports in the past. >>> You can try to see what happens when you disable the forwarding of the >>> Kafka metrics with register.consumer.metrics: false [2]. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties >>> >>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <kevin....@shopify.com> wrote: >>> >>>> Hi all, >>>> >>>> I'm observing an issue sometimes, and it's been hard to reproduce, >>>> where task managers are not able to register with the Flink cluster. We >>>> provision only the number of task managers required to run a given >>>> application, and so the absence of any of the task managers causes the job >>>> to enter a crash loop where it fails to get the required task slots. >>>> >>>> The failure occurs after a job has been running for a while, and when >>>> there have been job and task manager restarts. We run in kubernetes so pod >>>> disruptions occur from time to time, however we're running using the high >>>> availability setup [0] >>>> >>>> Has anyone encountered this before? Any suggestions? >>>> >>>> Below are some error messages pulled from the task managers failing to >>>> re-register. >>>> >>>> ``` >>>> ] - Starting DefaultLeaderRetrievalService with >>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}. >>>> 2021-08-16 13:15:10,112 INFO >>>> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - >>>> Starting DefaultLeaderElectionService with >>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}. >>>> 2021-08-16 13:15:10,205 INFO >>>> org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector >>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for >>>> streaming-sales-model-staging-restserver-leader. >>>> 2021-08-16 13:15:10,205 INFO >>>> org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector >>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for >>>> streaming-sales-model-staging-resourcemanager-leader. >>>> 2021-08-16 13:15:10,205 INFO >>>> org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector >>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for >>>> streaming-sales-model-staging-dispatcher-leader. >>>> 2021-08-16 13:15:10,211 INFO >>>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] >>>> - Starting DefaultLeaderRetrievalService with >>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}. >>>> 2021-08-16 13:16:26,103 WARN >>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] >>>> - Error while retrieving the leader gateway. Retrying to connect to >>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1. >>>> 2021-08-16 13:16:30,978 WARN >>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] >>>> - Error while retrieving the leader gateway. Retrying to connect to >>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1. >>>> ``` >>>> >>>> ``` >>>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread >>>> [] - Uncaught exception in thread >>>> 'kafka-producer-network-thread | >>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075': >>>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1 >>>> at >>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748) >>>> ~[?:?] >>>> at >>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899) >>>> ~[?:?] >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?] >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) >>>> ~[?:?] >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) >>>> ~[?:?] >>>> at java.lang.Thread.run(Unknown Source) [?:?] >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.apache.kafka.clients.NetworkClient$1 >>>> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] >>>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] >>>> at >>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59] >>>> at >>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59] >>>> at >>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59] >>>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] >>>> ... 6 more >>>> ``` >>>> >>>> ``` >>>> connection to [null] failed with java.net.ConnectException: Connection >>>> refused: flink-jobmanager/10.28.65.100:6123 >>>> 2021-08-16 13:14:59,668 WARN akka.remote.ReliableDeliverySupervisor >>>> [] - Association with remote system >>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now >>>> gated for [50] ms. Reason: [Association failed with >>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by: >>>> [java.net.ConnectException: Connection refused: flink-jobmanager/ >>>> 10.28.65.100:6123] >>>> 2021-08-16 13:14:59,669 INFO >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could >>>> not resolve ResourceManager address >>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0, >>>> retrying in 10000 ms: Could not connect to rpc endpoint under address >>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0. >>>> ``` >>>> >>>> ``` >>>> 2021-08-15 16:55:13,222 ERROR >>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution >>>> [] - Failed to submit a listener notification task. Event loop shut down? >>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory >>>> error has occurred. This can mean two things: either the job requires a >>>> larger size of JVM metaspace to load classes or there is a class loading >>>> leak. In the first case 'taskmanager.memory.jvm-metaspace.size' >>>> configuration option should be increased. If the error persists (usually in >>>> cluster after several job (re-)submissions) then there is probably a class >>>> loading leak in user code or some of its dependencies which has to be >>>> investigated and fixed. The task executor has to be shutdown... >>>> ``` >>>> >>>> >>>> [0] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes >>>> >>>