Hi,
Can someone please help me to resolve the below issue while running flink job.
Or provide me any doc/example which describe the exactly-once delivery 
guarantee semantics.

Thanks,
Gopal.

From: Gopal Chennupati (gchennup) <gchen...@cisco.com>
Date: Friday, 27 October 2023 at 11:00 AM
To: commun...@flink.apache.org <commun...@flink.apache.org>, 
u...@flink.apache.org <u...@flink.apache.org>
Subject: Unable to achieve Flink kafka connector exactly once delivery 
semantics.
Hi Team,


I am trying to configure my kafka sink connector with “exactly-once” delivery 
guarantee, however it’s failing when I run the flink job with this 
configuration, here is the full exception stack trace from the job logs.


[Source: SG-SGT-TransformerJob -> Map -> Sink: Writer -> Sink: Committer 
(5/10)#12] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering 
AppInfo mbean

javax.management.InstanceAlreadyExistsException: 
kafka.producer:type=app-info,id=producer-sgt-4-1

      at 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)

      at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)

      at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)

      at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)

      at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)

      at 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

      at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)

      at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)

      at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)

      at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316)

      at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)

      at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)

      at 
org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:332)

      at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionOfSubtask(TransactionAborter.java:104)

      at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionsWithPrefix(TransactionAborter.java:82)

      at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortLingeringTransactions(TransactionAborter.java:66)

      at 
org.apache.flink.connector.kafka.sink.KafkaWriter.abortLingeringTransactions(KafkaWriter.java:295)

      at 
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:176)

      at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)

      at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)

      at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)

      at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)

      at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)

      at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)

      at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)

      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)

      at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)

      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)

      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)

      at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)

      at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)

      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)

      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)

      at java.base/java.lang.Thread.run(Thread.java:834)


And here is the producer configuration,
KafkaSink<T> sink = KafkaSink
                        .<T>builder()
                        
.setBootstrapServers(producerConfig.getProperty("bootstrap.servers"))
                        .setKafkaProducerConfig(producerConfig)
                        .setRecordSerializer(new 
GenericMessageSerialization<>(generic_key.class,
                                    generic_value.class, 
producerConfig.getProperty("topic"),
                                    
producerConfig.getProperty("schema.registry.url")))
                        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                        .setTransactionalIdPrefix("generic")
                        .build();

Can someone please help me what is missing here or what additional steps we 
need to take to achieve this.

Thanks,
Gopal.

回复