Hi, Kenan.

Maybe you should set the `client.id.prefix` to avoid the conflict.

Best,
Hang

liu ron <ron9....@gmail.com> 于2023年7月31日周一 19:36写道:

> Hi, Kenan
>
> After studying the source code and searching google for related
> information, I think this should be caused by duplicate client_id [1], you
> can check if there are other jobs using the same group_id in consuming this
> topic. group_id is used in Flink to assemble client_id [2], if there are
> already jobs using the same group _id, the duplicated client_id will be
> detected on the Kafka side.
>
> [1]
> https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer
> [2]
> https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466
>
> Best,
> Ron
>
> Kenan Kılıçtepe <kkilict...@gmail.com> 于2023年7月25日周二 21:48写道:
>
>>
>>
>>
>>
>> Any help is appreciated about the exception below.
>> Also my Kafkasource code is below. The parallelism is 16 for this task.
>>
>>                 KafkaSource<String> sourceStationsPeriodic = KafkaSource.
>> <String>builder()
>>                                 .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>>                                 .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_WIFI))
>>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>>                                 .setStartingOffsets(OffsetsInitializer.
>> latest())
>>                                 .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>
>>                                 .build();
>>                 // Our Kafka Source
>>                 KafkaSource<String> sourceStationsWifiInterface =
>> KafkaSource.<String>builder()
>>                                 .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>>                                 .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_WIFI_INTERFACE))
>>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>>                                 .setStartingOffsets(OffsetsInitializer.
>> latest())
>>                                 .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>                                 .build();
>>                 KafkaSource<String> sourceTwinMessage = KafkaSource.<
>> String>builder()
>>                                 .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>>                                 .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_TWIN_MESSAGE))
>>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>>                                 .setStartingOffsets(OffsetsInitializer.
>> latest())
>>                                 .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>                                 .build();
>>
>>                 KafkaSource<String> sourceStationsOnDemand = KafkaSource.
>> <String>builder()
>>                                 .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>>                                 .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_STATIONS_ON_DEMAND))
>>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>>                                 .setStartingOffsets(OffsetsInitializer.
>> latest())
>>                                 .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>                                 .build();
>>
>>                 KafkaSource<String> sourceDeviceInfo = KafkaSource.<
>> String>builder()
>>                                 .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>>                                 .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_DEVICE_INFO))
>>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>>                                 .setStartingOffsets(OffsetsInitializer.
>> latest())
>>                                 .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>                                 .build();
>>
>>
>>
>> 2023-07-23 07:06:24,927 WARN  org.apache.kafka.common.utils.AppInfoParser
>>                  [] - Error registering AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client
>>         at
>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
>>         at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>> ~[?:?]
>>         at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>> ~[?:?]
>>         at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>> ~[?:?]
>>         at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
>> ~[?:?]
>>         at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> ~[?:?]
>>         at
>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.kafka.clients.admin.KafkaAdminClient.<init>(KafkaAdminClient.java:597)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:539)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at org.apache.kafka.clients.admin.Admin.create(Admin.java:133)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
>> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>>         at
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>>         at
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449)
>> ~[flink-dist-1.16.0.jar:1.16.0]
>>         at
>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>> [flink-dist-1.16.0.jar:1.16.0]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> [?:?]
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> [?:?]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>>         at java.lang.Thread.run(Thread.java:829) [?:?]
>>
>>
>>

Reply via email to