You can keep the same transaction ID if you are restarting the job as a continuation of what was running before. You need distinct IDs for different jobs that will be running against the same kafka brokers. I think of the transaction ID as an application identifier.
See [1] for a complete list of what needs to be done to achieve exactly-once with Kafka. [1] https://www.docs.immerok.cloud/docs/cookbook/exactly-once-with-apache-kafka-and-apache-flink/ On Wed, Aug 17, 2022 at 1:02 AM kcz <573693...@qq.com> wrote: > flink-1.14.4 > kafka-2.4.0 > setTransactionalIdPrefix has a small question, this parameter if I start > the next job, can not use the last transaction ID, need to automatically > generate a new one, I just tested the restart from chk, but also generated > a new transaction ID, this will not lead to data loss, right? I would like > to ask for guidance from the big guys, I don't know if I can guarantee no > data loss by producing a new transaction ID through > System.currentTimeMillis() during production. Please help me, I need to use > this function in production environment, but I don't know how to write it > correctly. Pushing cleanup data from kafka consumption to kafka. > > KafkaSink<String> sink = KafkaSink.<String>builder() > .setBootstrapServers(BOOTSTRAP_SERVERS) > .setKafkaProducerConfig(kafkaProducerConfig) > > .setTransactionalIdPrefix(JOB_NAME+System.currentTimeMillis()+"transactional.id") > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > .setTopic(SINK_TOPICS) > .setValueSerializationSchema(new SimpleStringSchema()) > .build()) > .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) > .build(); > >