@Becket do we already have a JIRA ticket to track this effort?

Cheers,
Till

On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Sidney,
>
> The WARN logging you saw was from the AbstractPartitionDiscoverer which is
> created by FlinkKafkaConsumer itself. It has an internal consumer which
> shares the client.id of the actual consumer fetching data. This is a bug
> that we should fix.
>
> As Rong said, this won't affect the normal operation of the consumer. It
> is just an AppInfo MBean for reporting some information. There might be
> some slight impact on the accuracy of the consumer metrics, but should be
> almost ignorable because the partition discoverer is quite inactive
> compared with the actual consumer.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <walter...@gmail.com> wrote:
>
>> We also had seen this issue before running Flink apps in a shared cluster
>> environment.
>>
>> Basically, Kafka is trying to register a JMX MBean[1] for application
>> monitoring.
>> This is only a WARN suggesting that you are registering more than one
>> MBean with the same client id "consumer-1", it should not affect your
>> normal application behavior.
>>
>> This is most likely occurring if you have more than one Kafka consumer
>> within the same JVM, are you using a session cluster[2]? can you share more
>> on your application configuration including parallelism and slot configs?
>> Also based on the log, you are not configuring the "client.id"
>> correctly. which config key are you using? could you also share your fill
>> Kafka properties map?
>>
>>
>> --
>> Rong
>>
>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>
>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <sidney.fei...@startapp.com>
>> wrote:
>>
>>> Hey,
>>> I've been using Flink for a while now without any problems when running
>>> apps with a FlinkKafkaConsumer.
>>> All my apps have the same overall logic (consume from kafka -> transform
>>> event -> write to file) and the only way they differ from each other is the
>>> topic they read (remaining kafka config remains identical) and the way they
>>> transform the event.
>>> But suddenly, I've been starting to get the following error:
>>>
>>>
>>> 2020-03-15 12:13:56,911 WARN
>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>> registering AppInfo mbean
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=consumer-1
>>>        at
>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>
>>>        at
>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>
>>>        at
>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>
>>>        at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>
>>>        at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>
>>>        at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>
>>>        at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>
>>>        at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>
>>>        at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>
>>>        at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>
>>>        at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>
>>>        at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>
>>>        at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>
>>>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>        at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>> making sure I don't have any duplicates but that didn't help either.
>>> Any idea what could be causing this?
>>>
>>> Thanks 🙂
>>>
>>> *Sidney Feiner* */* Data Platform Developer
>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>
>>> [image: emailsignature]
>>>
>>>

Reply via email to