We also had seen this issue before running Flink apps in a shared cluster
environment.

Basically, Kafka is trying to register a JMX MBean[1] for application
monitoring.
This is only a WARN suggesting that you are registering more than one MBean
with the same client id "consumer-1", it should not affect your normal
application behavior.

This is most likely occurring if you have more than one Kafka consumer
within the same JVM, are you using a session cluster[2]? can you share more
on your application configuration including parallelism and slot configs?
Also based on the log, you are not configuring the "client.id" correctly.
which config key are you using? could you also share your fill Kafka
properties map?


--
Rong

[1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session

On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <sidney.fei...@startapp.com>
wrote:

> Hey,
> I've been using Flink for a while now without any problems when running
> apps with a FlinkKafkaConsumer.
> All my apps have the same overall logic (consume from kafka -> transform
> event -> write to file) and the only way they differ from each other is the
> topic they read (remaining kafka config remains identical) and the way they
> transform the event.
> But suddenly, I've been starting to get the following error:
>
>
> 2020-03-15 12:13:56,911 WARN  org.apache.kafka.common.utils.AppInfoParser
>                   - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-1
>        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>
>        at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>
>        at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>
>        at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>
>        at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>
>        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>
>        at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>        at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>
>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>        at java.lang.Thread.run(Thread.java:748)
>
>
> I've tried setting the "client.id" on my consumer to a random UUID,
> making sure I don't have any duplicates but that didn't help either.
> Any idea what could be causing this?
>
> Thanks 🙂
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>

Reply via email to