mjsax commented on code in PR #17454: URL: https://github.com/apache/kafka/pull/17454#discussion_r1807083423
########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. Review Comment: We should not say "delivered" (also further above) -- this term is too close to "delivery semantics at the network layer", -- we still retry internally, and _deliver_ a message more than once (holds true for idempotent producer, as well as aborted TX which are retried, and aborted stuff is just filtered later). Btw: Further above we says: > let's discuss the semantic guarantees Kafka provides between producer and consumer This is also misleading, because EOS works for read-process-write, but not for upstream producer to downstream consumer. While the consumer is guarded to not return aborted messages to the app, it's still at-least-once as there is not guarantee that a committed messages is only read exactly-once. ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that + replicates the partition to which this message was written remains "alive". The definition of committed message and alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and - experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. + experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. <p> Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending Review Comment: > resending Might need to clarify the distinction between producer retries, vs app retires? ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that + replicates the partition to which this message was written remains "alive". The definition of committed message and alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and - experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. + experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. <p> Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. - Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are. + Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are. The main use case for this is exactly-once processing between Kafka topics (described below). Review Comment: I would call it "exactly-once processing" -- it's just an "atomic write". ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can Review Comment: We are (correctly) switching from producer -> topic -> consumer to read-process-write here, but that's very subtle. I think we need to call the difference out at the very top and make it explicit. ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that Review Comment: > straightforward Sounds like a huge simplification... Also, for non-transaction, there no such thing as "committed" -- seems we should clarify this better? Why not use "acknowledge" which would be the proper Kafka term? ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows Review Comment: ```suggestion primitives which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows ``` ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that + replicates the partition to which this message was written remains "alive". The definition of committed message and alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and - experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. + experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. <p> Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. - Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are. + Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are. The main use case for this is exactly-once processing between Kafka topics (described below). <p> - Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message - being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not + Not all use cases require such strong guarantees. For uses which are latency-sensitive, we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message + being committed, this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not necessarily the followers) have the message. Review Comment: Seems we are jumping back and forth between different things here. ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). Review Comment: If we consider a single read-process-write pattern, the consumer's isolation level does not really matter too much? Of course, if the input topic is transactional, having the consumer in read-committed mode is desirable, but from the POV how EOS work, it seems somewhat orthogonal? (Or do I start to split hairs...?) ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that + replicates the partition to which this message was written remains "alive". The definition of committed message and alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and - experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. + experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. <p> Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. - Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are. + Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are. The main use case for this is exactly-once processing between Kafka topics (described below). <p> - Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message - being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not + Not all use cases require such strong guarantees. For uses which are latency-sensitive, we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message Review Comment: ```suggestion Not all use cases require such strong guarantees. For use cases which are latency-sensitive, we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message ``` ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages. - <h3 class="anchor-heading"><a id="replication" class="anchor-link"></a><a href="#replication">4.7 Replication</a></h3> + <h3 class="anchor-heading"><a id="usingtransactions" class="anchor-link"></a><a href="#usingtransactions">4.7 Using Transactions</a></h3> + <p> + As mentioned above, the simplest way to get exactly-once semantics from Kafka is to use <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>. However, it is also possible to achieve + the same transactional guarantees using the Kafka producer and consumer directly by using them in the same way as Kafka Streams does. + <p> + Kafka transactions are a bit different than transactions in other messaging systems. In Kafka, the consumer and producer are separate and it is only the producer which is transactional. It is however able to + make transactional updates to the consumer's position (confusingly called the "committed offset"), and it is this which gives the overall exactly-once behavior. + <p> + There are three key aspects to exactly-once processing using the producer and consumer, which match how Kafka Streams works. + <ol> + <li>The consumer uses partition assignment to ensure that it is the only consumer in the consumer group currently processing each partition.</li> + <li>The consumer uses read-committed isolation level to ensure that it does not consume records produced by transactions which aborted.</li> + <li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li> + </ol> + <p> + The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code> + to the name of the transactional ID to be used, which configures the producer for transactional delivery and also makes sure that a restarted application causes any in-flight transaction from + the previous instance to abort. Only the producer has the <code>transactional.id</code> configuration. Strictly speaking, the consumer doesn't have to use read_committed isolation level, but if + it does not, it will see records from aborted transactions and also open transactions which have not yet completed, which seems undesirable if trying to achieve exactly-once. + <p> + Here's an example of a <a href="https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java">transactional message copier</a> + which uses these principles. It uses a <code>KafkaConsumer</code> to consume records from one topic and a <code>KafkaProducer</code> to produce records to another topic. It uses transactions to ensure + that there is no duplication or loss of records as they are copied. + <p> + It is important to handle exceptions and aborted transactions correctly. Any records written by the transational producer will be marked as being part of the transactions, and then when the + transaction commits or aborts, transaction marker records are written to indicate the outcome of the transaction. This is how the read-committed consumer does not see records from aborted + transactions. However, in the event of a transaction abort, the application's in-memory state and in particular the current position of the consumer must be reset explicitly so that it can Review Comment: ```suggestion transactions. However, in the event of a transaction abort, the application's state and in particular the current position of the consumer must be reset explicitly so that it can ``` ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide Review Comment: ```suggestion As a result, Kafka supports exactly-once processing semantics in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer can be used generally to provide ``` ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that + replicates the partition to which this message was written remains "alive". The definition of committed message and alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and - experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. + experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. <p> Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. - Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are. + Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are. The main use case for this is exactly-once processing between Kafka topics (described below). <p> - Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message - being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not + Not all use cases require such strong guarantees. For uses which are latency-sensitive, we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message Review Comment: What is a "durability level" -- are we talking about `acks=...` here? ########## docs/design.html: ########## @@ -290,24 +290,56 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value and the produced data on the output topics will not + be visible to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, + but in "read_committed" isolation level, the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages. - <h3 class="anchor-heading"><a id="replication" class="anchor-link"></a><a href="#replication">4.7 Replication</a></h3> + <h3 class="anchor-heading"><a id="usingtransactions" class="anchor-link"></a><a href="#usingtransactions">4.7 Using Transactions</a></h3> + <p> + As mentioned above, the simplest way to get exactly-once semantics from Kafka is to use <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>. However, it is also possible to achieve + the same transactional guarantees using the Kafka producer and consumer directly by using them in the same way as Kafka Streams does. + <p> + Kafka transactions are a bit different than transactions in other messaging systems. In Kafka, the consumer and producer are separate and it is only the producer which is transactional. It is however able to + make transactional updates to the consumer's position (confusingly called the "committed offset"), and it is this which gives the overall exactly-once behavior. + <p> + There are three key aspects to exactly-once processing using the producer and consumer, which match how Kafka Streams works. + <ol> + <li>The consumer uses partition assignment to ensure that it is the only consumer in the consumer group currently processing each partition.</li> + <li>The consumer uses read-committed isolation level to ensure that it does not consume records produced by transactions which aborted.</li> + <li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li> + </ol> + <p> + The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code> Review Comment: Seems, I did not split hairs above :) ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide Review Comment: There is no such thing as a "transactional consumer"? Or do you mean a consumer in "read-committed" mode? ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages. - <h3 class="anchor-heading"><a id="replication" class="anchor-link"></a><a href="#replication">4.7 Replication</a></h3> + <h3 class="anchor-heading"><a id="usingtransactions" class="anchor-link"></a><a href="#usingtransactions">4.7 Using Transactions</a></h3> + <p> + As mentioned above, the simplest way to get exactly-once semantics from Kafka is to use <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>. However, it is also possible to achieve + the same transactional guarantees using the Kafka producer and consumer directly by using them in the same way as Kafka Streams does. + <p> + Kafka transactions are a bit different than transactions in other messaging systems. In Kafka, the consumer and producer are separate and it is only the producer which is transactional. It is however able to + make transactional updates to the consumer's position (confusingly called the "committed offset"), and it is this which gives the overall exactly-once behavior. + <p> + There are three key aspects to exactly-once processing using the producer and consumer, which match how Kafka Streams works. + <ol> + <li>The consumer uses partition assignment to ensure that it is the only consumer in the consumer group currently processing each partition.</li> + <li>The consumer uses read-committed isolation level to ensure that it does not consume records produced by transactions which aborted.</li> + <li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li> + </ol> + <p> + The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code> + to the name of the transactional ID to be used, which configures the producer for transactional delivery and also makes sure that a restarted application causes any in-flight transaction from + the previous instance to abort. Only the producer has the <code>transactional.id</code> configuration. Strictly speaking, the consumer doesn't have to use read_committed isolation level, but if + it does not, it will see records from aborted transactions and also open transactions which have not yet completed, which seems undesirable if trying to achieve exactly-once. + <p> + Here's an example of a <a href="https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java">transactional message copier</a> Review Comment: This example seems only to work correctly, if `useGroupMetadata` option is enables... w/o it, EOS cannot be guaranteed. ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the Review Comment: > when transferring and processing data between Kafka topics What does this mean? Sounds like read-process-write, ie, what KS does. ########## docs/design.html: ########## @@ -254,32 +254,32 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s <i>At least once</i>—Messages are never lost but may be redelivered. </li> <li> - <i>Exactly once</i>—this is what people actually want, each message is delivered once and only once. + <i>Exactly once</i>—Each message is delivered once and only once. </li> </ul> It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message. <p> - Many systems claim to provide "exactly once" delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don't translate to the case where consumers or producers + Many systems claim to provide "exactly-once" delivery semantics, but it is important to read the fine print, because sometimes these claims are misleading (i.e. they don't translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). <p> - Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that - replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be + Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that + replicates the partition to which this message was written remains "alive". The definition of committed message and alive partition as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and - experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. + experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. <p> Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message. - Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are. + Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are. Review Comment: I like to describe TX as "multi-message multi-topic/partition atomic write". Should we add something like this? ########## docs/design.html: ########## @@ -290,24 +290,58 @@ <h3 class="anchor-heading"><a id="semantics" class="anchor-link"></a><a href="#s messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself). </ol> <p> - So what about exactly once semantics (i.e. the thing you actually want)? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> - application), we can leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in a topic, so we can write the offset to Kafka in the - same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's position will revert to its old value and the produced data on the output topics will not be visible - to other consumers, depending on their "isolation level." In the default "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, - but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction). + So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can + leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the + same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value (although the consumer has to refetch the + committed offset because it does not automatically rewind) and the produced data on the output topics will not be visible to other consumers, depending on their "isolation level". In the default + "read_uncommitted" isolation level, all messages are visible to consumers even if they were part of an aborted transaction, but in "read_committed" isolation level, the consumer will only return messages + from transactions which were committed (and any messages which were not part of a transaction). <p> When writing to an external system, the limitation is in the need to coordinate the consumer's position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase - commit between the storage of the consumer position and the storage of the consumers output. But this can be handled more simply and generally by letting the consumer store its offset in the same place as + commit between the storage of the consumer position and the storage of the consumers output. This can be handled more simply and generally by letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, consider a <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a> connector which populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication. <p> - So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide + As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows the user to implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages. - <h3 class="anchor-heading"><a id="replication" class="anchor-link"></a><a href="#replication">4.7 Replication</a></h3> + <h3 class="anchor-heading"><a id="usingtransactions" class="anchor-link"></a><a href="#usingtransactions">4.7 Using Transactions</a></h3> + <p> + As mentioned above, the simplest way to get exactly-once semantics from Kafka is to use <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>. However, it is also possible to achieve + the same transactional guarantees using the Kafka producer and consumer directly by using them in the same way as Kafka Streams does. + <p> + Kafka transactions are a bit different than transactions in other messaging systems. In Kafka, the consumer and producer are separate and it is only the producer which is transactional. It is however able to + make transactional updates to the consumer's position (confusingly called the "committed offset"), and it is this which gives the overall exactly-once behavior. + <p> + There are three key aspects to exactly-once processing using the producer and consumer, which match how Kafka Streams works. + <ol> + <li>The consumer uses partition assignment to ensure that it is the only consumer in the consumer group currently processing each partition.</li> + <li>The consumer uses read-committed isolation level to ensure that it does not consume records produced by transactions which aborted.</li> + <li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li> + </ol> + <p> + The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code> + to the name of the transactional ID to be used, which configures the producer for transactional delivery and also makes sure that a restarted application causes any in-flight transaction from + the previous instance to abort. Only the producer has the <code>transactional.id</code> configuration. Strictly speaking, the consumer doesn't have to use read_committed isolation level, but if Review Comment: There is actually much more to it... For KS, to do proper fencing in combination with rebalancing, we needed to use a producer-per-task. This is very expensive. For EOSv2, we move to producer-per-thread model (which is basically described here) for which fencing does not happen on the producer side any longer (it might, be we don't rely on it), but we actually general random `transactional.id`s and rely on a build it "fencing" mechanism inside the ConsumerGroupCoordinator (in combination with broker side transaction timeouts...) Not sure how detailed we want to be here, but bottom line is: this description might be over-simplified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org