Hi, Kenan. Maybe you should set the `client.id.prefix` to avoid the conflict.
Best, Hang liu ron <ron9....@gmail.com> 于2023年7月31日周一 19:36写道: > Hi, Kenan > > After studying the source code and searching google for related > information, I think this should be caused by duplicate client_id [1], you > can check if there are other jobs using the same group_id in consuming this > topic. group_id is used in Flink to assemble client_id [2], if there are > already jobs using the same group _id, the duplicated client_id will be > detected on the Kafka side. > > [1] > https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer > [2] > https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466 > > Best, > Ron > > Kenan Kılıçtepe <kkilict...@gmail.com> 于2023年7月25日周二 21:48写道: > >> >> >> >> >> Any help is appreciated about the exception below. >> Also my Kafkasource code is below. The parallelism is 16 for this task. >> >> KafkaSource<String> sourceStationsPeriodic = KafkaSource. >> <String>builder() >> .setBootstrapServers(parameter.get( >> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) >> .setTopics(parameter.get( >> KAFKA_TOPIC_READ_WIFI)) >> .setGroupId(parameter.get(KAFKA_GROUP)) >> .setStartingOffsets(OffsetsInitializer. >> latest()) >> .setValueOnlyDeserializer(new >> SimpleStringSchema()) >> >> .build(); >> // Our Kafka Source >> KafkaSource<String> sourceStationsWifiInterface = >> KafkaSource.<String>builder() >> .setBootstrapServers(parameter.get( >> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) >> .setTopics(parameter.get( >> KAFKA_TOPIC_READ_WIFI_INTERFACE)) >> .setGroupId(parameter.get(KAFKA_GROUP)) >> .setStartingOffsets(OffsetsInitializer. >> latest()) >> .setValueOnlyDeserializer(new >> SimpleStringSchema()) >> .build(); >> KafkaSource<String> sourceTwinMessage = KafkaSource.< >> String>builder() >> .setBootstrapServers(parameter.get( >> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) >> .setTopics(parameter.get( >> KAFKA_TOPIC_READ_TWIN_MESSAGE)) >> .setGroupId(parameter.get(KAFKA_GROUP)) >> .setStartingOffsets(OffsetsInitializer. >> latest()) >> .setValueOnlyDeserializer(new >> SimpleStringSchema()) >> .build(); >> >> KafkaSource<String> sourceStationsOnDemand = KafkaSource. >> <String>builder() >> .setBootstrapServers(parameter.get( >> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) >> .setTopics(parameter.get( >> KAFKA_TOPIC_READ_STATIONS_ON_DEMAND)) >> .setGroupId(parameter.get(KAFKA_GROUP)) >> .setStartingOffsets(OffsetsInitializer. >> latest()) >> .setValueOnlyDeserializer(new >> SimpleStringSchema()) >> .build(); >> >> KafkaSource<String> sourceDeviceInfo = KafkaSource.< >> String>builder() >> .setBootstrapServers(parameter.get( >> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) >> .setTopics(parameter.get( >> KAFKA_TOPIC_READ_DEVICE_INFO)) >> .setGroupId(parameter.get(KAFKA_GROUP)) >> .setStartingOffsets(OffsetsInitializer. >> latest()) >> .setValueOnlyDeserializer(new >> SimpleStringSchema()) >> .build(); >> >> >> >> 2023-07-23 07:06:24,927 WARN org.apache.kafka.common.utils.AppInfoParser >> [] - Error registering AppInfo mbean >> javax.management.InstanceAlreadyExistsException: >> kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client >> at >> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) >> ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) >> ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) >> ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) >> ~[?:?] >> at >> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >> ~[?:?] >> at >> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.kafka.clients.admin.KafkaAdminClient.<init>(KafkaAdminClient.java:597) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:539) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at org.apache.kafka.clients.admin.Admin.create(Admin.java:133) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) >> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] >> at >> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225) >> ~[flink-dist-1.16.0.jar:1.16.0] >> at >> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449) >> ~[flink-dist-1.16.0.jar:1.16.0] >> at >> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) >> [flink-dist-1.16.0.jar:1.16.0] >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> [?:?] >> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) >> [?:?] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> [?:?] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> [?:?] >> at java.lang.Thread.run(Thread.java:829) [?:?] >> >> >>