Actually it might be better to create another ticket, FLINK-8093 was mainly complaining about the JMX bean collision when there are multiple tasks running in the same TM.
Jiangjie (Becket) Qin On Tue, Mar 17, 2020 at 6:33 PM Becket Qin <becket....@gmail.com> wrote: > Hi Till, > > It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> reports > the same issue, although the reported information is not exactly correct, > as this should not cause the producer to fail. I'll take care of the ticket. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> @Becket do we already have a JIRA ticket to track this effort? >> >> Cheers, >> Till >> >> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <becket....@gmail.com> wrote: >> >>> Hi Sidney, >>> >>> The WARN logging you saw was from the AbstractPartitionDiscoverer which >>> is created by FlinkKafkaConsumer itself. It has an internal consumer which >>> shares the client.id of the actual consumer fetching data. This is a >>> bug that we should fix. >>> >>> As Rong said, this won't affect the normal operation of the consumer. It >>> is just an AppInfo MBean for reporting some information. There might be >>> some slight impact on the accuracy of the consumer metrics, but should be >>> almost ignorable because the partition discoverer is quite inactive >>> compared with the actual consumer. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <walter...@gmail.com> wrote: >>> >>>> We also had seen this issue before running Flink apps in a shared >>>> cluster environment. >>>> >>>> Basically, Kafka is trying to register a JMX MBean[1] for application >>>> monitoring. >>>> This is only a WARN suggesting that you are registering more than one >>>> MBean with the same client id "consumer-1", it should not affect your >>>> normal application behavior. >>>> >>>> This is most likely occurring if you have more than one Kafka consumer >>>> within the same JVM, are you using a session cluster[2]? can you share more >>>> on your application configuration including parallelism and slot configs? >>>> Also based on the log, you are not configuring the "client.id" >>>> correctly. which config key are you using? could you also share your fill >>>> Kafka properties map? >>>> >>>> >>>> -- >>>> Rong >>>> >>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session >>>> >>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner < >>>> sidney.fei...@startapp.com> wrote: >>>> >>>>> Hey, >>>>> I've been using Flink for a while now without any problems when >>>>> running apps with a FlinkKafkaConsumer. >>>>> All my apps have the same overall logic (consume from kafka -> >>>>> transform event -> write to file) and the only way they differ from each >>>>> other is the topic they read (remaining kafka config remains identical) >>>>> and >>>>> the way they transform the event. >>>>> But suddenly, I've been starting to get the following error: >>>>> >>>>> >>>>> 2020-03-15 12:13:56,911 WARN >>>>> org.apache.kafka.common.utils.AppInfoParser - Error >>>>> registering AppInfo mbean >>>>> javax.management.InstanceAlreadyExistsException: >>>>> kafka.consumer:type=app-info,id=consumer-1 >>>>> at >>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) >>>>> at >>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) >>>>> >>>>> at >>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) >>>>> >>>>> at >>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) >>>>> >>>>> at >>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) >>>>> >>>>> at >>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >>>>> >>>>> at >>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) >>>>> >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805) >>>>> >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) >>>>> >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) >>>>> >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >>>>> >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >>>>> >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) >>>>> >>>>> at >>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) >>>>> >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> >>>>> I've tried setting the "client.id" on my consumer to a random UUID, >>>>> making sure I don't have any duplicates but that didn't help either. >>>>> Any idea what could be causing this? >>>>> >>>>> Thanks 🙂 >>>>> >>>>> *Sidney Feiner* */* Data Platform Developer >>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp >>>>> >>>>> [image: emailsignature] >>>>> >>>>>