jolshan commented on code in PR #17454: URL: https://github.com/apache/kafka/pull/17454#discussion_r1801587174
########## docs/design.html: ########## @@ -290,24 +290,56 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value and the produced data on the output topics will not + be visible to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, + but in "read_committed" isolation level, the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages. - <h3 class="anchor-heading"><a id="replication" class="anchor-link"></a><a href="#replication">4.7 Replication</a></h3> + <h3 class="anchor-heading"><a id="usingtransactions" class="anchor-link"></a><a href="#usingtransactions">4.7 Using Transactions</a></h3> + <p> + As mentioned above, the simplest way to get exactly-once semantics from Kafka is to use <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>. However, it is also possible to achieve + the same transactional guarantees using the Kafka producer and consumer directly by using them in the same way as Kafka Streams does. + <p> + Kafka transactions are a bit different than transactions in other messaging systems. In Kafka, the consumer and producer are separate and it is only the producer which is transactional. It is however able to + make transactional updates to the consumer's position (confusingly called the "committed offset"), and it is this which gives the overall exactly-once behavior. + <p> + There are three key aspects to exactly-once processing using the producer and consumer, which match how Kafka Streams works. + <ol> + <li>The consumer uses partition assignment to ensure that it is the only consumer in the consumer group currently processing each partition.</li> + <li>The consumer uses read-committed isolation level to ensure that it does not consume records produced by transactions which aborted.</li> + <li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li> + </ol> + <p> + The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code> Review Comment: It could also be a topic not written with transactions and in that case, read committed and read uncommitted have the same behavior. Mostly just calling out it isn't a requirement, though probably a good practice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org