I would recommend to read the Kafka Streams KIP about EOS: https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
Fencing is the most critical part in the implementation. Kafka Streams basically uses a dedicated `transactional.id` per input topic-partition. Hence, if a rebalance happens and a partition is re-assigned, it's ensure that only one "instance" of a consumer-producer pair can commit the transactions successfully, and the "new producer" would use the same associated `transactional.id` as the "original producer". There is actually a KIP in progress that will make using transactions simpler, as it basically improves fencing: https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics But I agree with Alex, if you can, it's recommended to use Streams API that solves those problems for you. There is no need to re-implement it from scratch. Hope this helps. -Matthias On 10/30/19 8:30 PM, Alex Brekken wrote: > Sergi, have you looked at using the Kafka Streams API? If you're just > consuming from a topic, transforming the data, then writing it out to a > sink topic, you can probably do that relatively easily using the Streams > DSL. No need to manually subscribe, poll, or commit offsets. Then if you > want to get exactly once guarantees, you just set the processing.guarantee > property to "exactly_once" and you're all set. However, since it sounds > like your application isn't stateful then I think your only concern is with > producing a duplicate message(s) to the sink topic right? (you don't have > any internal state that could get messed up in the event of crashes, > network failures, etc) Do you have control over who/what consumes the sink > topic? If so, can you make that consumer tolerant of duplicate messages? > Exactly once works well in my experience, but there is overhead involved so > only use it if you need it. :) > > Alex > > On Wed, Oct 30, 2019 at 10:04 PM Kidong Lee <[email protected]> wrote: > >> Hi, >> >> It may be not for your case, but I have implemented an example about kafka >> transaction: >> >> https://medium.com/@mykidong/kafka-transaction-56f022af1b0c >> >> , in this example, offsets are saved to external db. >> >> - Kidong >> >> >> >> >> >> 2019년 10월 31일 (목) 오전 11:39, Sergi Vladykin <[email protected]>님이 >> 작성: >> >>> Ok, so what is the advice? Not to use Kafka transactions ever because >> they >>> are unusable in real life? >>> Can you please provide a recipe how to make it work in the simple >> scenario: >>> no databases, just two topics, no admin actions. >>> >>> Sergi >>> >>> ср, 30 окт. 2019 г. в 22:39, Jörn Franke <[email protected]>: >>> >>>> Please note that for exactlyOnce it is not sufficient to set simply an >>>> option. The producer and consumer must individually make sure that they >>>> only process the message once. For instance, the consumer can crash and >>> it >>>> may then resend already submitted messages or the producer might crash >>> and >>>> might write the same message twice to a database etc. >>>> Or due to a backup and restore or through a manual admin action all >> these >>>> things might happen. >>>> Those are not “edge” scenarios. In operations they can happen quiet >>> often, >>>> especially in a Containerized infrastructure. >>>> This you have to consider for all messaging solutions (not only Kafka) >> in >>>> your technical design. >>>> >>>>> Am 30.10.2019 um 20:30 schrieb Sergi Vladykin < >>> [email protected] >>>>> : >>>>> >>>>> Hi! >>>>> >>>>> I investigate possibilities of "exactly once" Kafka transactions for >>>>> consume-transform-produce pattern. As far as I understand, the logic >>> must >>>>> be the following (in pseudo-code): >>>>> >>>>> var cons = createKafkaConsumer(MY_CONSUMER_GROUP_ID); >>>>> cons.subscribe(TOPIC_A); >>>>> for (;;) { >>>>> var recs = cons.poll(); >>>>> for (var part : recs.partitions()) { >>>>> var partRecs = recs.records(part); >>>>> var prod = getOrCreateProducerForPart(MY_TX_ID_PREFIX + >> part); >>>>> prod.beginTransaction(); >>>>> sendAllRecs(prod, TOPIC_B, partRecs); >>>>> prod.sendOffsetsToTransaction(singletonMap(part, >>>>> lastRecOffset(partRecs) + 1), >>>>> >>>>> MY_CONSUMER_GROUP_ID); >>>>> prod.commitTransaction(); >>>>> } >>>>> } >>>>> >>>>> Is this right approach? >>>>> >>>>> Because it looks to me there is a possible race here and the same >>> record >>>>> from topic A can be committed to topic B more than once: if >> rebalancing >>>>> happens after our thread polled a record and before creating a >>> producer, >>>>> then another thread will read and commit the same record, after that >>> our >>>>> thread will wake up, create a producer (and fence the other one) and >>>>> successfully commit the same record second time. >>>>> >>>>> Can anyone explain please how to do "exactly once" in Kafka right? >>>>> Examples would be very helpful. >>>>> >>>>> Sergi >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature
