Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging
> The PID and sequence numbers are totally transparent to applications. Now that you say it, the cwiki makes that point already pretty clear by the way the Producer API is (not) being changed. Sorry for taking your time on this. In other words, at the point where messages enter the Kafka system for the first time, KIP-98 does not provide exactly-once guarantees across producer sessions. This all makes sense, and what I am trying to do is probably best tackled on the business layer - where it needs to be done anyway if one cares about end-to-end guarantees - so I will just resend all potentially unsent messages when a producer fails, and use a business-level id inside the message to de-duplicate messages further downstream. This also exhibits good performance (no additional persisting to disk of seq-id information). As de-duplication processing is stateful, a consumer might want to maintain (and persist to another topic) an x-hour "LRU business ids" cache - but that seems straightforward with Kafka. Anyway, this is no longer of interest to KIP-98. Thanks for the clarification! Cheers Eugen On 2017年01月31日 04:39, Apurva Mehta wrote: Eugen, moving your email to the main thread so that it doesn't get split. The `transaction.app.id` is a prerequisite for using transactional APIs. And only messages wrapped inside transactions will enjoy idempotent guarantees across sessions, and that too only when they employ a consume-process-produce pattern. Say I have a producer, producing messages into a topic and I only want to guarantee the producer cannot insert duplicates. In other words, there's no downstream consumer/processor to be worried about - which, when considering the correctness of the data only, is all I need for idempotent producers, as every message has a unique id (offset), so downstream processes can take care of exactly once processing by any number of means. (If you need transactional all-or-none behavior, which KIP-98 also addresses, that's of course a more complex story) I was under the impression that KIP-98 would fulfill above requirement, i.e. the prevention of duplicate inserts of the same message into a topic per producer, without using transactions, and guaranteed across tcp connections to handle producer/broker crashes and network problems. The KIP-98 idempotent producer solution only protects against duplicates in the stream when there are broker failures and network problems. For instance, if a producer writes a message, and the leader commits and replicates the message but dies before the acknowledgement is sent to the client. Today, the client will resend the message which will be accepted by the new leader, hence causing duplicates. Also, the offsets of the duplicate messages in this case will be unique, so they can't be de-duped downstream based on the offset. If the client application itself dies, it needs to know which messages were previously sent so that it doesn't resend them when it comes back online. The proposed solution to this situation is to use transactional APIs and the consume-process-produce pattern. If you do so, partially processed previous inputs will be discarded, and processing will resume from the last committed state. In other words, producers where the `transaction.app.id` is specified will not enjoy idempotence across sessions unless their messages are transactional. ie. that the sends are wrapped between `beginTransaction`, `sendOffsets`, and `commitTransaction`. From the KIP-98 wiki and the design document, I understand that AppIDs, PIDs, and sequence numbers are enforced regardless of their being wrapped in a transaction or not. Is that not so? The PID and sequence numbers are totally transparent to applications. If you enable idempotent production, these will be created and managed by Kafka. AppIds only need to be specified if you use the four new transactional APIs. This is enforced at runtime.
Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging
Thanks Apurva - replies inline. On 2017年01月27日 15:19, Apurva Mehta wrote: Eugen, moving your email to the main thread so that it doesn't get split. The `transaction.app.id` is a prerequisite for using transactional APIs. And only messages wrapped inside transactions will enjoy idempotent guarantees across sessions, and that too only when they employ a consume-process-produce pattern. Say I have a producer, producing messages into a topic and I only want to guarantee the producer cannot insert duplicates. In other words, there's no downstream consumer/processor to be worried about - which, when considering the correctness of the data only, is all I need for idempotent producers, as every message has a unique id (offset), so downstream processes can take care of exactly once processing by any number of means. (If you need transactional all-or-none behavior, which KIP-98 also addresses, that's of course a more complex story) I was under the impression that KIP-98 would fulfill above requirement, i.e. the prevention of duplicate inserts of the same message into a topic per producer, without using transactions, and guaranteed across tcp connections to handle producer/broker crashes and network problems. In other words, producers where the `transaction.app.id` is specified will not enjoy idempotence across sessions unless their messages are transactional. ie. that the sends are wrapped between `beginTransaction`, `sendOffsets`, and `commitTransaction`. From the KIP-98 wiki and the design document, I understand that AppIDs, PIDs, and sequence numbers are enforced regardless of their being wrapped in a transaction or not. Is that not so? Cheers, Eugen The comment about the heartbeat was just a passing comment about the fact that an AppId could be expired if a producer doesn't use transactions for a long time. We don't plan to implement heartbeats in V1, though we might in the future. Hope this clarified things. Regards, Apurva KIP-98 says > transaction.app.id: A unique and persistent way to identify a producer. This is used to ensure idempotency and to enable transaction recovery or rollback across producer sessions. This is optional: you will lose cross-session guarantees if this is blank. which might suggest that a producer that does not use the transactional features, but does set the transaction.app.id, could get cross-session idempotency. But the design document "Exactly Once Delivery and Transactional Messaging in Kafka" rules that out: > For the idempotent producer (i.e., producer that do not use transactional APIs), currently we do not make any cross-session guarantees in any case. In the future, we can extend this guarantee by having the producer to periodically send InitPIDRequest to the transaction coordinator to keep the AppID from expiring, which preserves the producer's zombie defence. Until that point in the future, could my non-transactional producer send a InitPIDRequest once and then heartbeat via BeginTxnRequest/EndTxnRequest(ABORT) in intervals less than transaction.app.id.timeout.ms in order to guarantee cross-session itempotency? Or is that not guaranteed because "currently we do not make any cross-session guarantees in any case"? I know this is would be an ugly hack. I guess that is also what the recently added "Producer HeartBeat" feature proposal would address - although it is described to prevent idle transactional producers from having their AppIds expired. Related question: If KIP-98 does not make cross-session guarantees for idempotent producers, is the only improvement over the current idempotency situation the prevention of duplicate messages in case of a partition leader migration? Because if a broker fails or the publisher fails, KIP-98 does not seem to change the risk of dupes for non-transactional producers. Btw: Good job! Both in terms of Kafka in general, and KIP-98 in particular Cheers On Wed, Jan 25, 2017 at 6:00 PM, Apurva Mehtawrote: On Tue, Jan 17, 2017 at 6:17 PM, Apurva Mehta wrote: Hi Jun, Some answers in line. 109. Could you describe when Producer.send() will receive an Unrecognized MessageException? This exception will be thrown if the producer sends a sequence number which is greater than the sequence number expected by the broker (ie. more than 1 greater than the previously sent sequence number). This can happen in two cases: a) If there is a bug in the producer where sequence numbers are incremented more than once per message. So the producer itself will send messages with gaps in sequence numbers. b) The broker somehow lost a previous message. In a cluster configured for durability (ie. no unclean leader elections, replication factor of 3, min.isr of 2, acks=all, etc.), this should not happened. So realistically, this exception will only be thrown in clusters configured for high availability where brokers could lose messages. Becket raised the question if
Guaranteeing cross-session idempotency with KIP-98
KIP-98 says > transaction.app.id: A unique and persistent way to identify a producer. This is used to ensure idempotency and to enable transaction recovery or rollback across producer sessions. This is optional: you will lose cross-session guarantees if this is blank. which might suggest that a producer that does not use the transactional features, but does set the transaction.app.id, could get cross-session idempotency. But the design document "Exactly Once Delivery and Transactional Messaging in Kafka" rules that out: > For the idempotent producer (i.e., producer that do not use transactional APIs), currently we do not make any cross-session guarantees in any case. In the future, we can extend this guarantee by having the producer to periodically send InitPIDRequest to the transaction coordinator to keep the AppID from expiring, which preserves the producer's zombie defence. Until that point in the future, could my non-transactional producer send a InitPIDRequest once and then heartbeat via BeginTxnRequest/EndTxnRequest(ABORT) in intervals less than transaction.app.id.timeout.ms in order to guarantee cross-session itempotency? Or is that not guaranteed because "currently we do not make any cross-session guarantees in any case"? I know this is would be an ugly hack. I guess that is also what the recently added "Producer HeartBeat" feature proposal would address - although it is described to prevent idle transactional producers from having their AppIds expired. Related question: If KIP-98 does not make cross-session guarantees for idempotent producers, is the only improvement over the current idempotency situation the prevention of duplicate messages in case of a partition leader migration? Because if a broker fails or the publisher fails, KIP-98 does not seem to change the risk of dupes for non-transactional producers. Btw: Good job! Both in terms of Kafka in general, and KIP-98 in particular Cheers Eugen