Hey Kevin, setting the uid is not needed for exactly-once guarantees. It is used if you want to restore the operator state manually using a savepoint.
This blog blog post (there are probably a lot more explaining this) could be helpful to understand how the checkpointing ensures exactly once despite failures: https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets#:~:text=The%20Kafka%20consumer%20in%20Apache,offsets%20in%20all%20Kafka%20partitions.&text=Flink's%20checkpoint%20mechanism%20ensures%20that,on%20the%20same%20input%20data . On Tue, Oct 20, 2020 at 10:28 PM Kevin Kwon <fsw0...@gmail.com> wrote: > Hi team > > I'm subscribing 2 topics from Kafka Consumer, joining them and publishing > back to a new topic via KafkaProducer (with Exactly Once semantic) > > As it's highly recommended to set uid for each operator, I'm curious how > this works. For example, > > val topicASource = env > .addSource(topicAConsumer) > .uid("topicAConsumer") > > val topicBSource = env > .addSource(topicAConsumer) > .uid("topicAConsumer") > > val result = joinstream(env, topicASource, topicBSource) > .uid("transformer") > > val topicCSink = result > .addSink(topicCProducer) > .uid("topicCProducer") > > > in this code, is it necessary to set the UID of the transformer? If the > consumer offset is not committed until it finally gets published to sink, > will consumers replaying from offset from previous > checkpoint guarantee exactly once? even though transformer state is lost > when restarting? >