Hi, Can someone please help me to resolve the below issue while running flink job. Or provide me any doc/example which describe the exactly-once delivery guarantee semantics.
Thanks, Gopal. From: Gopal Chennupati (gchennup) <gchen...@cisco.com> Date: Friday, 27 October 2023 at 11:00 AM To: commun...@flink.apache.org <commun...@flink.apache.org>, u...@flink.apache.org <u...@flink.apache.org> Subject: Unable to achieve Flink kafka connector exactly once delivery semantics. Hi Team, I am trying to configure my kafka sink connector with “exactly-once” delivery guarantee, however it’s failing when I run the flink job with this configuration, here is the full exception stack trace from the job logs. [Source: SG-SGT-TransformerJob -> Map -> Sink: Writer -> Sink: Committer (5/10)#12] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-sgt-4-1 at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55) at org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:332) at org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionOfSubtask(TransactionAborter.java:104) at org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionsWithPrefix(TransactionAborter.java:82) at org.apache.flink.connector.kafka.sink.TransactionAborter.abortLingeringTransactions(TransactionAborter.java:66) at org.apache.flink.connector.kafka.sink.KafkaWriter.abortLingeringTransactions(KafkaWriter.java:295) at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:176) at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111) at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:834) And here is the producer configuration, KafkaSink<T> sink = KafkaSink .<T>builder() .setBootstrapServers(producerConfig.getProperty("bootstrap.servers")) .setKafkaProducerConfig(producerConfig) .setRecordSerializer(new GenericMessageSerialization<>(generic_key.class, generic_value.class, producerConfig.getProperty("topic"), producerConfig.getProperty("schema.registry.url"))) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("generic") .build(); Can someone please help me what is missing here or what additional steps we need to take to achieve this. Thanks, Gopal.