Re: [DISCUSS] KIP-27 - Conditional Publish
This is a very nice summary of the consistency / correctness issues possible with a commit log. (assuming it’s publishing asynchronously and in an open loop) It's perhaps already clear to folks here, but -- if you *don't* do that, and instead only send one batch of messages at a time and check the result, you don't have the interleaving issue. (Of course, that means you give up pipelining batches...) On Aug 10, 2015 2:46 PM, Flavio Junqueira f...@apache.org wrote: I've been trying to understand what is being proposed in this KIP and I've put down some notes with some feedback from Ben that I wanted to share for feedback. I'm not really following the flow of the thread, since I've read a few sources to get to this, and I apologize for that. Here is how I see it t a high level. There are really two problems being discussed in the context of this KIP: Single writer with failover: Consistent logs Single writer with failover The idea is that at any time there must be at most one publisher active. To get high availability, we can’t rely on a single process to be such a publisher and consequently we need the failover part: if the current active publisher crashes, then another publisher takes over and becomes active. One important issue with scenarios like this is that during transitions from one active publisher to another, there could be races and two publishers end up interleaving messages in a topic/partition/key. Why is this interleaving bad? This is really application specific, but one general way of seeing this is that only one process has the authoritative application state to generate messages to publish. Transitioning from an active publisher to another, typically requires recovering state or performing some kind of coordination. If no such recovery is required, then we are essentially in the multi-writer space. The commit log use case is a general one mentioned in the KIP description. Consistent logs Consistent logs might not be the best term here, but I’m using it to describe the need of having the messages in a topic/partition/key reflecting consistently the state of the application. For example, some applications might be OK with a published sequence: A B B C (e.g., value = 10) in the case the messages are idempotent operations, but others might really require: A B C (e.g., value += 10) if they aren’t idempotent operations. Order and gaps are also an issue, so some applications might be OK with: A C B (e.g., value += x) and skipping B altogether might be ok if B has no side-effects (e.g., operation associated to B has failed). Putting things together The current KIP-27 proposal seems to do a good job with providing a consistent log in the absence of concurrency. It enables publishers to re-publish messages without duplication, which is one requirement for exactly-once semantics. Gaps need to be handled by the publisher. For example, if the publisher publishes A B C (assuming it’s publishing asynchronously and in an open loop), it could have A succeeding but not B and C. In this case, it needs to redo the publish of B and C. It could also have B failing and C succeeding, in which case the publisher repeats B and C. A really nice feature of the current proposal is that it is a simple primitive that enables the implementation of publishers with different delivery guarantees. It doesn’t seem to be well suited to the first problem of implementing a single writer with failover, however. It allows runs in which two producers interleave messages because the mechanism focuses on a single message. The single writer might not even care about duplicates and gaps depending on the application, but it might care that there aren’t two publishers interleaving messages in the Kafka log. A typical way of dealing with these cases is to use a token associated to a lease to fence off the other publishers. For example, to demote an active publisher, another publisher could invoke a demote call and have the ISR leader replace the token. The lease of the token could be done directly with ZooKeeper or via the ISR leader. The condition to publish a message or a batch could be a combination of token verification and offset check. -Flavio On 10 Aug 2015, at 00:15, Jun Rao j...@confluent.io wrote: Couple of other things. A. In the discussion, we talked about the usage of getting the latest high watermark from the broker. Currently, the high watermark in a partition can go back a bit for a short period of time during leader change. So, the high watermark returned in the getOffset api is not 100% accurate. There is a jira (KAFKA-2334) to track this issue. B. The proposal in the wiki is to put the expected offset in every message, even when the messages are compressed. With Jiangjie's proposal of relative offset, the expected offset probably can only be set at the shallow compressed message level. We will need to think
Re: [DISCUSS] KIP-27 - Conditional Publish
I've been trying to understand what is being proposed in this KIP and I've put down some notes with some feedback from Ben that I wanted to share for feedback. I'm not really following the flow of the thread, since I've read a few sources to get to this, and I apologize for that. Here is how I see it t a high level. There are really two problems being discussed in the context of this KIP: Single writer with failover: Consistent logs Single writer with failover The idea is that at any time there must be at most one publisher active. To get high availability, we can’t rely on a single process to be such a publisher and consequently we need the failover part: if the current active publisher crashes, then another publisher takes over and becomes active. One important issue with scenarios like this is that during transitions from one active publisher to another, there could be races and two publishers end up interleaving messages in a topic/partition/key. Why is this interleaving bad? This is really application specific, but one general way of seeing this is that only one process has the authoritative application state to generate messages to publish. Transitioning from an active publisher to another, typically requires recovering state or performing some kind of coordination. If no such recovery is required, then we are essentially in the multi-writer space. The commit log use case is a general one mentioned in the KIP description. Consistent logs Consistent logs might not be the best term here, but I’m using it to describe the need of having the messages in a topic/partition/key reflecting consistently the state of the application. For example, some applications might be OK with a published sequence: A B B C (e.g., value = 10) in the case the messages are idempotent operations, but others might really require: A B C (e.g., value += 10) if they aren’t idempotent operations. Order and gaps are also an issue, so some applications might be OK with: A C B (e.g., value += x) and skipping B altogether might be ok if B has no side-effects (e.g., operation associated to B has failed). Putting things together The current KIP-27 proposal seems to do a good job with providing a consistent log in the absence of concurrency. It enables publishers to re-publish messages without duplication, which is one requirement for exactly-once semantics. Gaps need to be handled by the publisher. For example, if the publisher publishes A B C (assuming it’s publishing asynchronously and in an open loop), it could have A succeeding but not B and C. In this case, it needs to redo the publish of B and C. It could also have B failing and C succeeding, in which case the publisher repeats B and C. A really nice feature of the current proposal is that it is a simple primitive that enables the implementation of publishers with different delivery guarantees. It doesn’t seem to be well suited to the first problem of implementing a single writer with failover, however. It allows runs in which two producers interleave messages because the mechanism focuses on a single message. The single writer might not even care about duplicates and gaps depending on the application, but it might care that there aren’t two publishers interleaving messages in the Kafka log. A typical way of dealing with these cases is to use a token associated to a lease to fence off the other publishers. For example, to demote an active publisher, another publisher could invoke a demote call and have the ISR leader replace the token. The lease of the token could be done directly with ZooKeeper or via the ISR leader. The condition to publish a message or a batch could be a combination of token verification and offset check. -Flavio On 10 Aug 2015, at 00:15, Jun Rao j...@confluent.io wrote: Couple of other things. A. In the discussion, we talked about the usage of getting the latest high watermark from the broker. Currently, the high watermark in a partition can go back a bit for a short period of time during leader change. So, the high watermark returned in the getOffset api is not 100% accurate. There is a jira (KAFKA-2334) to track this issue. B. The proposal in the wiki is to put the expected offset in every message, even when the messages are compressed. With Jiangjie's proposal of relative offset, the expected offset probably can only be set at the shallow compressed message level. We will need to think this through. Thanks, Jun On Tue, Aug 4, 2015 at 3:05 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Jun, I see. So this only applies to uncompressed messages. Maybe that is fine given most user will probably turn on compression? I think the first approach is a more general approach but from application point of view might harder to implement. I am thinking is it easier for the application simply have one producer for a partition and hash the message to producer. In that case,
Re: [DISCUSS] KIP-27 - Conditional Publish
Couple of other things. A. In the discussion, we talked about the usage of getting the latest high watermark from the broker. Currently, the high watermark in a partition can go back a bit for a short period of time during leader change. So, the high watermark returned in the getOffset api is not 100% accurate. There is a jira (KAFKA-2334) to track this issue. B. The proposal in the wiki is to put the expected offset in every message, even when the messages are compressed. With Jiangjie's proposal of relative offset, the expected offset probably can only be set at the shallow compressed message level. We will need to think this through. Thanks, Jun On Tue, Aug 4, 2015 at 3:05 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Jun, I see. So this only applies to uncompressed messages. Maybe that is fine given most user will probably turn on compression? I think the first approach is a more general approach but from application point of view might harder to implement. I am thinking is it easier for the application simply have one producer for a partition and hash the message to producer. In that case, we can use the second approach but still have multiple producers. The downside might be potentially more memory footprint? We might also need to think about the fault tolerance a little bit more. Ben, I agree when everything goes fine, having pipeline turned on is probably fine. But if we take leader migration, broker down, message appended to leader but not follower, etc, etc into consideration, it is not clear to me how the conditional publish will still provide its guarantee without enforcing those strict settings. Thanks, Jiangjie (Becket) Qin On Mon, Aug 3, 2015 at 9:55 PM, Jun Rao j...@confluent.io wrote: A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional publish can potentially be used to deal with duplicates. If the master encounters the same transient failure as the above, it can get the latest offset from the Kafka topic/partition to see if the publish actually succeeded or not since it's the only producer. A potential issue here is to handle the zombie master problem: if the master has a soft failure and another master is elected, we need to prevent the old master from publishing new data to Kafka. So, for this approach to work properly, we need some kind of support of single writer in addition to conditional publish. Jiangjie, The issue with partial commit is the following. Say we have a batch of 10 uncompressed messages sent to the leader. The followers only fetched the first 5 messages and then the leader dies. In this case, we only committed 5 out of the 10 messages. Thanks, Jun On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each
Re: [DISCUSS] KIP-27 - Conditional Publish
This is a great summary of the commit log options. Some additional comments: 1. One way to handle a transient failure is to treat it the same way as a conditional publish failure: recompute the post-value before retrying. (I believe this is enough to make this approach work correctly.) 2. You can think of 2 as an 'optimization' of 1: the election mechanism is there to ensure that the conditional publish failures happen very rarely. When there are no conflicts, the conditional publish is essentially free. I guess I think of the zombie master problem like this: given some window of time where two nodes both think they are the master, conditional publish is enough to ensure that only one of the two will successfully publish. However, it's not enough to ensure that the 'new' master is the successful one. This might cause the leadership transition to happen a bit later than it would otherwise, but it doesn't seem to actually impact correctness. On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote: A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional publish can potentially be used to deal with duplicates. If the master encounters the same transient failure as the above, it can get the latest offset from the Kafka topic/partition to see if the publish actually succeeded or not since it's the only producer. A potential issue here is to handle the zombie master problem: if the master has a soft failure and another master is elected, we need to prevent the old master from publishing new data to Kafka. So, for this approach to work properly, we need some kind of support of single writer in addition to conditional publish. Jiangjie, The issue with partial commit is the following. Say we have a batch of 10 uncompressed messages sent to the leader. The followers only fetched the first 5 messages and then the leader dies. In this case, we only committed 5 out of the 10 messages. Thanks, Jun On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any
Re: [DISCUSS] KIP-27 - Conditional Publish
Jiangjie -- it seems to me that, while there are cases where you want conservative producer settings like you suggest, there are others enabled by this KIP where pipelining and retries are not an issue. As a toy example, I've adapted the producer performance test to behave as an idempotent producer here: https://github.com/bkirwi/kafka/blob/conditional-publish/tools/src/main/java/org/apache/kafka/clients/tools/IdempotentProducerPerformance.java This appends the numbers 1-N in-order to some partition. If you've set up the server to do the offset checks, it's possible to run multiple instances of this producer without reordering messages or adding duplicates. For this kind of application, pipelining turns out to be safe... it might result in more failed messages if there's a conflict, but it won't hurt correctness. On Mon, Jul 27, 2015 at 7:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is 'first write wins' -- and instead of dropping duplicates on the server, it returns an error to the client. The duplicate-handling behaviour (dropping vs. erroring) has some interesting consequences: - If all producers are producing the same stream of messages, silently dropping duplicates on the server is more convenient. (Suppose we have a batch of messages 0-9, and the high-water mark on the server is 7. Idempotent producer, as I read it, would append 7-9 to the partition and return success; meanwhile, conditional publish would fail the entire batch.) - If producers might be writing different streams of messages, the proposed behaviour of the idempotent producer is probably worse -- since it can silently interleave messages from two different producers. This can be a problem for some commit-log style use-cases, since it can transform a valid series of operations into an invalid one. - Given the error-on-duplicate behaviour, it's possible to implement deduplication on the client. (Sketch: if a publish returns an error for some partition, fetch the upcoming offset / sequence number for that partition, and discard all messages with a smaller offset on the client before republishing.) I think this makes the erroring behaviour more general, though deduplicating saves a roundtrip or two at conflict time. I'm less clear about the behaviour of the generation id, or what happens when (say) two producers with the same generation id are spun up at the same time. I'd be interested in hearing other folks' comments on this. Ewen: I'm not sure I understand the questions well enough to answer properly, but some quick notes: - I don't think it makes sense to assign an expected offset without already having assigned a partition. If the producer code is doing the partition assignment, it should probably do the offset assignment too... or we could just let application code handle both. - I'm not aware of any case where reassigning offsets to messages automatically after an offset
Re: [DISCUSS] KIP-27 - Conditional Publish
Jun, I see. So this only applies to uncompressed messages. Maybe that is fine given most user will probably turn on compression? I think the first approach is a more general approach but from application point of view might harder to implement. I am thinking is it easier for the application simply have one producer for a partition and hash the message to producer. In that case, we can use the second approach but still have multiple producers. The downside might be potentially more memory footprint? We might also need to think about the fault tolerance a little bit more. Ben, I agree when everything goes fine, having pipeline turned on is probably fine. But if we take leader migration, broker down, message appended to leader but not follower, etc, etc into consideration, it is not clear to me how the conditional publish will still provide its guarantee without enforcing those strict settings. Thanks, Jiangjie (Becket) Qin On Mon, Aug 3, 2015 at 9:55 PM, Jun Rao j...@confluent.io wrote: A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional publish can potentially be used to deal with duplicates. If the master encounters the same transient failure as the above, it can get the latest offset from the Kafka topic/partition to see if the publish actually succeeded or not since it's the only producer. A potential issue here is to handle the zombie master problem: if the master has a soft failure and another master is elected, we need to prevent the old master from publishing new data to Kafka. So, for this approach to work properly, we need some kind of support of single writer in addition to conditional publish. Jiangjie, The issue with partial commit is the following. Say we have a batch of 10 uncompressed messages sent to the leader. The followers only fetched the first 5 messages and then the leader dies. In this case, we only committed 5 out of the 10 messages. Thanks, Jun On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they
Re: [DISCUSS] KIP-27 - Conditional Publish
Hey Jun, Yeah I think Ben is right, both these cases are covered. The algorithm is something like while(true) { v = get_local_val(key) v' = modify(v) try { log_to_kafka(v') put_local_val(k, v') break } catch(CasFailureException e) { warn(optimistic lock failure) } } What I have yet to see is a complete design that would make use of this. I think we have two use cases this might (or might not) cover, a distributed log-centric data system and an event-sourced app. Both are actually kind of the same. I think what I haven't really seen is a working through of the details. I think it is important to think through this use case end-to-end. i.e. how is concurrency handled? what about other errors? what about request pipelining? how would queries work? Basically someone should write out notes on how to implement a key-value store with a kafka commit log assuming the existence of this feature and making use of something like RocksDB. I think that would uncover any problems that remain. -Jay On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin b...@kirw.in wrote: This is a great summary of the commit log options. Some additional comments: 1. One way to handle a transient failure is to treat it the same way as a conditional publish failure: recompute the post-value before retrying. (I believe this is enough to make this approach work correctly.) 2. You can think of 2 as an 'optimization' of 1: the election mechanism is there to ensure that the conditional publish failures happen very rarely. When there are no conflicts, the conditional publish is essentially free. I guess I think of the zombie master problem like this: given some window of time where two nodes both think they are the master, conditional publish is enough to ensure that only one of the two will successfully publish. However, it's not enough to ensure that the 'new' master is the successful one. This might cause the leadership transition to happen a bit later than it would otherwise, but it doesn't seem to actually impact correctness. On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote: A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional publish can potentially be used to deal with duplicates. If the master encounters the same transient failure as the above, it can get the latest offset from the Kafka topic/partition to see if the publish actually succeeded or not since it's the only producer. A potential issue here is to handle the zombie master problem: if the master has a soft failure and another master is elected, we need to prevent the old master from publishing new data to Kafka. So, for this approach to work properly, we need some kind of support of single writer in addition to conditional publish. Jiangjie, The issue with partial commit is the following. Say we have a batch of 10 uncompressed messages sent to the leader. The followers only fetched the first 5 messages and then the leader dies. In this case, we only committed 5 out of the 10 messages. Thanks, Jun On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may
Re: [DISCUSS] KIP-27 - Conditional Publish
Jun, You are right that Jay's example omits the consuming part, and the local state should be maintained only by consumed messages so that all replicas are eventually consistent. Assuming that, the beauty of conditional push is that it is safe for clients to do this independently. It guarantees that a successful update is always based on the most recent value. I think the fact that no master is needed is the greatest advantage. I think there shouldn't be any automatic retry. An application logic should handle when a conditional push failed. It is very application dependent what to do on failure. In Jay's example, it looks that the code keep modifying and pushing until it succeeds. But I don't think it was what Jay meant. modify(v) may throw an exception if the application decides to give up the update. Regarding the transient failure, isn't it reasonable to assume an application can tell success/failure from data themselves? Anyway, this KIP is not trying to solve a transient failure problem. It should be left to the idempotent producer. On Tue, Aug 4, 2015 at 9:42 AM, Jun Rao j...@confluent.io wrote: Jay, The code that you put there seems to be designed for the second case where there is a master. In the first case when there is no master, updates can happen from multiple replicas. Therefore, to maintain the local view, each replica can't just update the local view using only the updates sent to it directly. Instead, it has to read from the Kafka log to see updates from other replicas. Then the question is what happens when you get a LeaderNotAvailableException/IOException when calling log_to_kafka(v'). That call may or may not have succeeded. In the first case, it's hard for a replica to figure that out since there are other producers to the Kafka log. If you just retry, you may have modified the value incorrectly. For example, the conditional publish may have actually succeeded and the local view will at some point reflect that change. By retrying, we will be modifying the local view again (e.g., applying +1 twice to a value). I agree that it would good to see a more detailed end-to-end design to determine how conditional publish can be used and how much of the problem it solves. Thanks, Jun On Tue, Aug 4, 2015 at 9:18 AM, Jay Kreps j...@confluent.io wrote: Hey Jun, Yeah I think Ben is right, both these cases are covered. The algorithm is something like while(true) { v = get_local_val(key) v' = modify(v) try { log_to_kafka(v') put_local_val(k, v') break } catch(CasFailureException e) { warn(optimistic lock failure) } } What I have yet to see is a complete design that would make use of this. I think we have two use cases this might (or might not) cover, a distributed log-centric data system and an event-sourced app. Both are actually kind of the same. I think what I haven't really seen is a working through of the details. I think it is important to think through this use case end-to-end. i.e. how is concurrency handled? what about other errors? what about request pipelining? how would queries work? Basically someone should write out notes on how to implement a key-value store with a kafka commit log assuming the existence of this feature and making use of something like RocksDB. I think that would uncover any problems that remain. -Jay On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin b...@kirw.in wrote: This is a great summary of the commit log options. Some additional comments: 1. One way to handle a transient failure is to treat it the same way as a conditional publish failure: recompute the post-value before retrying. (I believe this is enough to make this approach work correctly.) 2. You can think of 2 as an 'optimization' of 1: the election mechanism is there to ensure that the conditional publish failures happen very rarely. When there are no conflicts, the conditional publish is essentially free. I guess I think of the zombie master problem like this: given some window of time where two nodes both think they are the master, conditional publish is enough to ensure that only one of the two will successfully publish. However, it's not enough to ensure that the 'new' master is the successful one. This might cause the leadership transition to happen a bit later than it would otherwise, but it doesn't seem to actually impact correctness. On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote: A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to
Re: [DISCUSS] KIP-27 - Conditional Publish
Jay, The code that you put there seems to be designed for the second case where there is a master. In the first case when there is no master, updates can happen from multiple replicas. Therefore, to maintain the local view, each replica can't just update the local view using only the updates sent to it directly. Instead, it has to read from the Kafka log to see updates from other replicas. Then the question is what happens when you get a LeaderNotAvailableException/IOException when calling log_to_kafka(v'). That call may or may not have succeeded. In the first case, it's hard for a replica to figure that out since there are other producers to the Kafka log. If you just retry, you may have modified the value incorrectly. For example, the conditional publish may have actually succeeded and the local view will at some point reflect that change. By retrying, we will be modifying the local view again (e.g., applying +1 twice to a value). I agree that it would good to see a more detailed end-to-end design to determine how conditional publish can be used and how much of the problem it solves. Thanks, Jun On Tue, Aug 4, 2015 at 9:18 AM, Jay Kreps j...@confluent.io wrote: Hey Jun, Yeah I think Ben is right, both these cases are covered. The algorithm is something like while(true) { v = get_local_val(key) v' = modify(v) try { log_to_kafka(v') put_local_val(k, v') break } catch(CasFailureException e) { warn(optimistic lock failure) } } What I have yet to see is a complete design that would make use of this. I think we have two use cases this might (or might not) cover, a distributed log-centric data system and an event-sourced app. Both are actually kind of the same. I think what I haven't really seen is a working through of the details. I think it is important to think through this use case end-to-end. i.e. how is concurrency handled? what about other errors? what about request pipelining? how would queries work? Basically someone should write out notes on how to implement a key-value store with a kafka commit log assuming the existence of this feature and making use of something like RocksDB. I think that would uncover any problems that remain. -Jay On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin b...@kirw.in wrote: This is a great summary of the commit log options. Some additional comments: 1. One way to handle a transient failure is to treat it the same way as a conditional publish failure: recompute the post-value before retrying. (I believe this is enough to make this approach work correctly.) 2. You can think of 2 as an 'optimization' of 1: the election mechanism is there to ensure that the conditional publish failures happen very rarely. When there are no conflicts, the conditional publish is essentially free. I guess I think of the zombie master problem like this: given some window of time where two nodes both think they are the master, conditional publish is enough to ensure that only one of the two will successfully publish. However, it's not enough to ensure that the 'new' master is the successful one. This might cause the leadership transition to happen a bit later than it would otherwise, but it doesn't seem to actually impact correctness. On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote: A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional
Re: [DISCUSS] KIP-27 - Conditional Publish
A couple of thoughts on the commit log use case. Suppose that we want to maintain multiple replicas of a K/V store backed by a shared Kafka topic/partition as a commit log. There are two possible ways to use Kafka as a commit log. 1. The first approach allows multiple producers to publish to Kafka. Each replica of the data store keeps reading from a Kafka topic/partition to refresh the replica's view. Every time a replica gets an update to a key from a client, it combines the update and the current value of the key in its view and generates a post-value. It then does a conditional publish to Kafka with the post-value. The update is successful if the conditional publish succeeds. Otherwise, the replica has to recompute the post-value (potentially after the replica's view is refreshed) and retry the conditional publish. A potential issue with this approach is when there is a transient failure during publishing to Kafka (e.g., the leader of the partition changes). When this happens, the conditional publish will get an error. The replica doesn't know whether the publish actually succeeded or not. If we just blindly retry, it may not give the correct behavior (e.g., we could be applying +1 twice). So, not sure if conditional publish itself is enough for this approach. 2. The second approach allows only a single producer to publish to Kafka. We somehow elect one of the replicas to be the master that handles all updates. Normally, we don't need conditional publish since there is a single producer. Conditional publish can potentially be used to deal with duplicates. If the master encounters the same transient failure as the above, it can get the latest offset from the Kafka topic/partition to see if the publish actually succeeded or not since it's the only producer. A potential issue here is to handle the zombie master problem: if the master has a soft failure and another master is elected, we need to prevent the old master from publishing new data to Kafka. So, for this approach to work properly, we need some kind of support of single writer in addition to conditional publish. Jiangjie, The issue with partial commit is the following. Say we have a batch of 10 uncompressed messages sent to the leader. The followers only fetched the first 5 messages and then the leader dies. In this case, we only committed 5 out of the 10 messages. Thanks, Jun On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is
Re: [DISCUSS] KIP-27 - Conditional Publish
Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is 'first write wins' -- and instead of dropping duplicates on the server, it returns an error to the client. The duplicate-handling behaviour (dropping vs. erroring) has some interesting consequences: - If all producers are producing the same stream of messages, silently dropping duplicates on the server is more convenient. (Suppose we have a batch of messages 0-9, and the high-water mark on the server is 7. Idempotent producer, as I read it, would append 7-9 to the partition and return success; meanwhile, conditional publish would fail the entire batch.) - If producers might be writing different streams of messages, the proposed behaviour of the idempotent producer is probably worse -- since it can silently interleave messages from two different producers. This can be a problem for some commit-log style use-cases, since it can transform a valid series of operations into an invalid one. - Given the error-on-duplicate behaviour, it's possible to implement deduplication on the client. (Sketch: if a publish returns an error for some partition, fetch the upcoming offset / sequence number for that partition, and discard all messages with a smaller offset on the client before republishing.) I think this makes the erroring behaviour more general, though deduplicating saves a roundtrip or two at conflict time. I'm less clear about the behaviour of the generation id, or what happens when (say) two producers with the same generation id are spun up at the same time. I'd be interested in hearing other folks' comments on this. Ewen: I'm not sure I understand the questions well enough to answer properly, but some quick notes: - I don't think it makes sense to assign an expected offset without already having assigned a partition. If the producer code is doing the partition assignment, it should probably do the offset assignment too... or we could just let application code handle both. - I'm not aware of any case where reassigning offsets to messages automatically after an offset mismatch makes sense: in the cases we've discussed, it seems like either it's safe to drop duplicates, or we want to handle the error at the application level. I'm going to try and come with an idempotent-producer-type example that works with the draft patch in the next few days, so hopefully we'll have something more concrete to discuss. Otherwise -- if you have a clear idea of how eg. sequence number assignment would work in the
Re: [DISCUSS] KIP-27 - Conditional Publish
@Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is 'first write wins' -- and instead of dropping duplicates on the server, it returns an error to the client. The duplicate-handling behaviour (dropping vs. erroring) has some interesting consequences: - If all producers are producing the same stream of messages, silently dropping duplicates on the server is more convenient. (Suppose we have a batch of messages 0-9, and the high-water mark on the server is 7. Idempotent producer, as I read it, would append 7-9 to the partition and return success; meanwhile, conditional publish would fail the entire batch.) - If producers might be writing different streams of messages, the proposed behaviour of the idempotent producer is probably worse -- since it can silently interleave messages from two different producers. This can be a problem for some commit-log style use-cases, since it can transform a valid series of operations into an invalid one. - Given the error-on-duplicate behaviour, it's possible to implement deduplication on the client. (Sketch: if a publish returns an error for some partition, fetch the upcoming offset / sequence number for that partition, and discard all messages with a smaller offset on the client before republishing.) I think this makes the erroring behaviour more general, though deduplicating saves a roundtrip or two at conflict time. I'm less clear about the behaviour of the generation id, or what happens when (say) two producers with the same generation id are spun up at the same time. I'd be interested in hearing other folks' comments on this. Ewen: I'm not sure I understand the questions well enough to answer properly, but some quick notes: - I don't think it makes sense to assign an expected offset without already having assigned a partition. If the producer code is doing the partition assignment, it should probably do the offset assignment too... or we could just let application code handle both. - I'm not aware of any case where reassigning offsets to messages automatically after an offset mismatch makes sense: in the cases we've discussed, it seems like either it's safe to drop duplicates, or we want to handle the error at the application level. I'm going to try and come with an idempotent-producer-type example that works with the draft patch in the next few days, so hopefully we'll have something more concrete to discuss. Otherwise -- if you have a clear idea of how eg. sequence number assignment would work in the idempotent-producer proposal, we could probably translate that over to get the equivalent for the conditional publish API. On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava e...@confluent.io wrote: @Becket - for compressed batches, I think this just works out given the KIP as described. Without the change you're referring to, it still only makes sense to batch messages with this KIP if all the expected offsets are sequential (else some messages are guaranteed to
Re: [DISCUSS] KIP-27 - Conditional Publish
So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is 'first write wins' -- and instead of dropping duplicates on the server, it returns an error to the client. The duplicate-handling behaviour (dropping vs. erroring) has some interesting consequences: - If all producers are producing the same stream of messages, silently dropping duplicates on the server is more convenient. (Suppose we have a batch of messages 0-9, and the high-water mark on the server is 7. Idempotent producer, as I read it, would append 7-9 to the partition and return success; meanwhile, conditional publish would fail the entire batch.) - If producers might be writing different streams of messages, the proposed behaviour of the idempotent producer is probably worse -- since it can silently interleave messages from two different producers. This can be a problem for some commit-log style use-cases, since it can transform a valid series of operations into an invalid one. - Given the error-on-duplicate behaviour, it's possible to implement deduplication on the client. (Sketch: if a publish returns an error for some partition, fetch the upcoming offset / sequence number for that partition, and discard all messages with a smaller offset on the client before republishing.) I think this makes the erroring behaviour more general, though deduplicating saves a roundtrip or two at conflict time. I'm less clear about the behaviour of the generation id, or what happens when (say) two producers with the same generation id are spun up at the same time. I'd be interested in hearing other folks' comments on this. Ewen: I'm not sure I understand the questions well enough to answer properly, but some quick notes: - I don't think it makes sense to assign an expected offset without already having assigned a partition. If the producer code is doing the partition assignment, it should probably do the offset assignment too... or we could just let application code handle both. - I'm not aware of any case where reassigning offsets to messages automatically after an offset mismatch makes sense: in the cases we've discussed, it seems like either it's safe to drop duplicates, or we want to handle the error at the application level. I'm going to try and come with an idempotent-producer-type example that works with the draft patch in the next few days, so hopefully we'll have something more concrete to discuss. Otherwise -- if you have a clear idea of how eg. sequence number assignment would work in the idempotent-producer proposal, we could probably translate that over to get the equivalent for the conditional publish API. On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava e...@confluent.io wrote: @Becket - for compressed batches, I think this just works out given the KIP as described. Without the change you're referring to, it still only makes sense to batch messages with this KIP if all the expected offsets are sequential (else some messages are guaranteed to fail). I think that probably just works out, but raises an issue I brought up on the KIP call. Batching can be a bit weird with this proposal. If you try to write key A and key B, the second operation is dependent on the first. Which means to make an effective client for this, we need to keep track of per-partition offsets so we can set expected offsets properly. For example, if A was expected to publish at offset 10, then if B was published to the same partition, we need to make sure it's marked as expected offset 11 (assuming no subpartition high water marks). We either need to have the application keep track of this itself and set the offsets, which requires that it know about how keys map to partitions, or the client needs to manage this process. But if the client manages it, I think the client gets quite a bit more complicated. If the produce request containing A fails, what happens to
Re: [DISCUSS] KIP-27 - Conditional Publish
@Becket - for compressed batches, I think this just works out given the KIP as described. Without the change you're referring to, it still only makes sense to batch messages with this KIP if all the expected offsets are sequential (else some messages are guaranteed to fail). I think that probably just works out, but raises an issue I brought up on the KIP call. Batching can be a bit weird with this proposal. If you try to write key A and key B, the second operation is dependent on the first. Which means to make an effective client for this, we need to keep track of per-partition offsets so we can set expected offsets properly. For example, if A was expected to publish at offset 10, then if B was published to the same partition, we need to make sure it's marked as expected offset 11 (assuming no subpartition high water marks). We either need to have the application keep track of this itself and set the offsets, which requires that it know about how keys map to partitions, or the client needs to manage this process. But if the client manages it, I think the client gets quite a bit more complicated. If the produce request containing A fails, what happens to B? Are there retries that somehow update the expected offset, or do we just give up since we know it's always going to fail with the expected offset that was automatically assigned to it? One way to handle this is to use Yasuhiro's idea of increasing the granularity of high watermarks using subpartitions. But I guess my question is: if one producer client is writing many keys, and some of those keys are produced to the same partition, and those messages are batched, what happens? Do we end up with lots of failed messages? Or do we have complicated logic in the producer to figure out what the right expected offset for each message is? Or do they all share the same base expected offset as in the compressed case, in which case they all share the same fate and subpartitioning doesn't help? Or is there a simpler solution I'm just not seeing? Maybe this just disables batching entirely and throughput isn't an issue in these cases? Sorry, I know that's probably not entirely clear, but that's because I'm very uncertain of how batching works with this KIP. On how this relates to other proposals: I think it might also be helpful to get an overview of all the proposals for relevant modifications to producers/produce requests since many of these proposals are possibly alternatives (though some may not be mutually exclusive). Many people don't have all the context from the past couple of years of the project. Are there any other relevant wikis or docs besides the following? https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata -Ewen On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira gshap...@cloudera.com wrote: Tangent: I think we should complete the move of Produce / Fetch RPC to the client libraries before we add more revisions to this protocol. On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I missed yesterday's KIP hangout. I'm currently working on another KIP for enriched metadata of messages. Guozhang has already created a wiki page before ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata ). We plan to fill the relative offset to the offset field in the batch sent by producer to avoid broker side re-compression. The message offset would become batch base offset + relative offset. I guess maybe the expected offset in KIP-27 can be only set for base offset? Would that affect certain use cases? For Jun's comments, I am not sure I completely get it. I think the producer only sends one batch per partition in a request. So either that batch is appended or not. Why a batch would be partially committed? Thanks, Jiangjie (Becket) Qin On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote: That's a fair point. I've added some imagined job logic to the KIP, so we can make sure the proposal stays in sync with the usages we're discussing. (The logic is just a quick sketch for now -- I expect I'll need to elaborate it as we get into more detail, or to address other concerns...) On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote: For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the
Re: [DISCUSS] KIP-27 - Conditional Publish
Tangent: I think we should complete the move of Produce / Fetch RPC to the client libraries before we add more revisions to this protocol. On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I missed yesterday's KIP hangout. I'm currently working on another KIP for enriched metadata of messages. Guozhang has already created a wiki page before ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata). We plan to fill the relative offset to the offset field in the batch sent by producer to avoid broker side re-compression. The message offset would become batch base offset + relative offset. I guess maybe the expected offset in KIP-27 can be only set for base offset? Would that affect certain use cases? For Jun's comments, I am not sure I completely get it. I think the producer only sends one batch per partition in a request. So either that batch is appended or not. Why a batch would be partially committed? Thanks, Jiangjie (Becket) Qin On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote: That's a fair point. I've added some imagined job logic to the KIP, so we can make sure the proposal stays in sync with the usages we're discussing. (The logic is just a quick sketch for now -- I expect I'll need to elaborate it as we get into more detail, or to address other concerns...) On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote: For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the commit log, the zombie writer can publish new messages to the log after the new writer takes over, but before it publishes any message. We probably need to outline how this case can be handled properly. Thanks, Jun On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know
Re: [DISCUSS] KIP-27 - Conditional Publish
I missed yesterday's KIP hangout. I'm currently working on another KIP for enriched metadata of messages. Guozhang has already created a wiki page before ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata). We plan to fill the relative offset to the offset field in the batch sent by producer to avoid broker side re-compression. The message offset would become batch base offset + relative offset. I guess maybe the expected offset in KIP-27 can be only set for base offset? Would that affect certain use cases? For Jun's comments, I am not sure I completely get it. I think the producer only sends one batch per partition in a request. So either that batch is appended or not. Why a batch would be partially committed? Thanks, Jiangjie (Becket) Qin On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote: That's a fair point. I've added some imagined job logic to the KIP, so we can make sure the proposal stays in sync with the usages we're discussing. (The logic is just a quick sketch for now -- I expect I'll need to elaborate it as we get into more detail, or to address other concerns...) On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote: For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the commit log, the zombie writer can publish new messages to the log after the new writer takes over, but before it publishes any message. We probably need to outline how this case can be handled properly. Thanks, Jun On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? You're right, it might be irritating to have to go through the consumer API just for this. There are some
Re: [DISCUSS] KIP-27 - Conditional Publish
For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the commit log, the zombie writer can publish new messages to the log after the new writer takes over, but before it publishes any message. We probably need to outline how this case can be handled properly. Thanks, Jun On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? You're right, it might be irritating to have to go through the consumer API just for this. There are some cases where the offsets are already available -- like the commit-log-for-KV-store example -- but in general, being able to get the offsets from the producer interface does sound convenient. We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you can describe this KIP a bit then? Sure, happy to join. Thanks, Jun On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote: Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs the offset check on the message key (instead of just the partition), with bounded space requirements, at the cost of potentially some spurious failures. (ie. the produce request may fail even if that particular key hasn't been updated recently.) This addresses a couple of the drawbacks of the per-key approach mentioned at the bottom of the KIP. On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete:
Re: [DISCUSS] KIP-27 - Conditional Publish
In KV store usage, all instances are writers, aren't they? There is no leader or master, thus there is no fail over. The offset based CAS ensures an update is based on the latest value and doesn't care who is writing the new value. I think the idea of the offset based CAS is great. I think it works very well with Event Sourcing. It may be a bit weak for ensuring the single writer though. On Tue, Jul 21, 2015 at 8:45 AM, Jun Rao j...@confluent.io wrote: For 1, yes, when there is a transient leader change, it's guaranteed that a prefix of the messages in a request will be committed. However, it seems that the client needs to know what subset of messages are committed in order to resume the sending. Then the question is how. As Flavio indicated, for the use cases that you listed, it would be useful to figure out the exact logic by using this feature. For example, in the partition K/V store example, when we fail over to a new writer to the commit log, the zombie writer can publish new messages to the log after the new writer takes over, but before it publishes any message. We probably need to outline how this case can be handled properly. Thanks, Jun On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? You're right, it might be irritating to have to go through the consumer API just for this. There are some cases where the offsets are already available -- like the commit-log-for-KV-store example -- but in general, being able to get the offsets from the producer interface does sound convenient. We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you can describe this KIP a bit then? Sure, happy to join. Thanks, Jun On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote: Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs
Re: [DISCUSS] KIP-27 - Conditional Publish
Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? You're right, it might be irritating to have to go through the consumer API just for this. There are some cases where the offsets are already available -- like the commit-log-for-KV-store example -- but in general, being able to get the offsets from the producer interface does sound convenient. We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you can describe this KIP a bit then? Sure, happy to join. Thanks, Jun On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote: Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs the offset check on the message key (instead of just the partition), with bounded space requirements, at the cost of potentially some spurious failures. (ie. the produce request may fail even if that particular key hasn't been updated recently.) This addresses a couple of the drawbacks of the per-key approach mentioned at the bottom of the KIP. On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request
Re: [DISCUSS] KIP-27 - Conditional Publish
The per-key generalization is useful. As Jiangjie mentioned in KAFKA-2260, one thing that we need to sort out is what happens if a produce request has messages with different keys and some of the messages have expected offsets while some others don't. Currently, the produce response has an error code per partition, not per message. One way is to just define the semantics as: the produce request will only go through if all keys in the request pass the offset test. Thanks, Jun On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote: Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs the offset check on the message key (instead of just the partition), with bounded space requirements, at the cost of potentially some spurious failures. (ie. the produce request may fail even if that particular key hasn't been updated recently.) This addresses a couple of the drawbacks of the per-key approach mentioned at the bottom of the KIP. On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request refused -- so we've avoided corrupting the state. - Envision some copycat-like system, where we have some sharded postgres setup and we're tailing each shard into its own partition. Normally, it's fairly easy to avoid duplicates here: we can track which offset in the WAL corresponds to which offset in Kafka, and we know how many messages we've written to Kafka already, so the state is very simple. However, it is possible that for a moment -- due to rebalancing or operator error or some other thing -- two different nodes are tailing the same postgres shard at once! Normally this would introduce duplicate messages, but by specifying the expected offset, we can avoid this. So perhaps it's better to say that this is useful when a single producer is *expected*, but multiple producers are *possible*? (In the same way that the high-level consumer normally has 1 consumer in a group reading from a partition, but there are small windows where more than one might be reading at the same time.) This is also the spirit of the 'runtime cost' comment -- in the common case, where there is little to no contention, there's no performance overhead either. I mentioned this a little in the Motivation section -- maybe I should flesh that out a little bit? For me, the motivation to work this up was that I kept running into cases, like the above, where the existing API was almost-but-not-quite enough to give the guarantees I was looking for -- and the extension needed to handle those cases too was pretty small and natural-feeling. On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul
Re: [DISCUSS] KIP-27 - Conditional Publish
Hi, Ben, Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. A couple questions on the design details. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you can describe this KIP a bit then? Thanks, Jun On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote: Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs the offset check on the message key (instead of just the partition), with bounded space requirements, at the cost of potentially some spurious failures. (ie. the produce request may fail even if that particular key hasn't been updated recently.) This addresses a couple of the drawbacks of the per-key approach mentioned at the bottom of the KIP. On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request refused -- so we've avoided corrupting the state. - Envision some copycat-like system, where we have some sharded postgres setup and we're tailing each shard into its own partition. Normally, it's fairly easy to avoid duplicates here: we can track which offset in the WAL corresponds to which offset in Kafka, and we know how many messages we've written to Kafka already, so the state is very simple. However, it is possible that for a moment -- due to rebalancing or operator error or some other thing -- two different nodes are tailing the same postgres shard at once! Normally this would introduce duplicate messages, but by specifying the expected offset, we can avoid this. So perhaps it's better to say that this is useful when a single producer is *expected*, but multiple producers are *possible*? (In the same way that the high-level consumer normally has 1 consumer in a group reading from a partition, but there are small windows where more than one might be reading at the same time.) This is also the spirit of the 'runtime cost' comment -- in the common case,
Re: [DISCUSS] KIP-27 - Conditional Publish
I'm with you on the races that could happen in the scenarios you describe, but I'm still not convinced that conditionally updating is the best call. Instead of conditionally updating, the broker could fence off the old owner to avoid spurious writes, and that's valid for all attempts. The advantage of fencing is that the broker does not accept at all requests from others, while the conditional update is a bit fragile to protect streams of publishes. -Flavio On Fri, Jul 17, 2015 at 11:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request refused -- so we've avoided corrupting the state. - Envision some copycat-like system, where we have some sharded postgres setup and we're tailing each shard into its own partition. Normally, it's fairly easy to avoid duplicates here: we can track which offset in the WAL corresponds to which offset in Kafka, and we know how many messages we've written to Kafka already, so the state is very simple. However, it is possible that for a moment -- due to rebalancing or operator error or some other thing -- two different nodes are tailing the same postgres shard at once! Normally this would introduce duplicate messages, but by specifying the expected offset, we can avoid this. So perhaps it's better to say that this is useful when a single producer is *expected*, but multiple producers are *possible*? (In the same way that the high-level consumer normally has 1 consumer in a group reading from a partition, but there are small windows where more than one might be reading at the same time.) This is also the spirit of the 'runtime cost' comment -- in the common case, where there is little to no contention, there's no performance overhead either. I mentioned this a little in the Motivation section -- maybe I should flesh that out a little bit? For me, the motivation to work this up was that I kept running into cases, like the above, where the existing API was almost-but-not-quite enough to give the guarantees I was looking for -- and the extension needed to handle those cases too was pretty small and natural-feeling. On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds
Re: [DISCUSS] KIP-27 - Conditional Publish
It would be worth fleshing out the use cases a bit more and thinking through the overlap with the other proposals for transactions and idempotence (since likely we will end up with both). The advantage of this proposal is that it is really simple. If we go through use cases: 1. Stream processing: I suspect in this case data is partitioned over multiple partitions/topics by multiple writers so it needs a more general atomicity across partitions. 2. Copycat: This is the case where you're publishing data from an external system. For some external systems I think this mechanism could provide an exactly-once publication mechanism however there are some details about retries to think through. 3. Key-value store/event sourcing: This is the case where you are building a log-centric key-value store or an event sourced application. I think this could potentially use this feature but it needs thinking through. One subtlety to think through is the relationship with request pipelining and retries. -Jay On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote: Hi Jun, Thanks for the close reading! Responses inline. Thanks for the write-up. The single producer use case you mentioned makes sense. It would be useful to include that in the KIP wiki. Great -- I'll make sure that the wiki is clear about this. 1. What happens when the leader of the partition changes in the middle of a produce request? In this case, the producer client is not sure whether the request succeeds or not. If there is only a single message in the request, the producer can just resend the request. If it sees an OffsetMismatch error, it knows that the previous send actually succeeded and can proceed with the next write. This is nice since it not only allows the producer to proceed during transient failures in the broker, it also avoids duplicates during producer resend. One caveat is when there are multiple messages in the same partition in a produce request. The issue is that in our current replication protocol, it's possible for some, but not all messages in the request to be committed. This makes resend a bit harder to deal with since on receiving an OffsetMismatch error, it's not clear which messages have been committed. One possibility is to expect that compression is enabled, in which case multiple messages are compressed into a single message. I was thinking that another possibility is for the broker to return the current high watermark when sending an OffsetMismatch error. Based on this info, the producer can resend the subset of messages that have not been committed. However, this may not work in a compacted topic since there can be holes in the offset. This is a excellent question. It's my understanding that at least a *prefix* of messages will be committed (right?) -- which seems to be enough for many cases. I'll try and come up with a more concrete answer here. 2. Is this feature only intended to be used with ack = all? The client doesn't get the offset with ack = 0. With ack = 1, it's possible for a previously acked message to be lost during leader transition, which will make the client logic more complicated. It's true that acks = 0 doesn't seem to be particularly useful; in all the cases I've come across, the client eventually wants to know about the mismatch error. However, it seems like there are some cases where acks = 1 would be fine -- eg. in a bulk load of a fixed dataset, losing messages during a leader transition just means you need to rewind / restart the load, which is not especially catastrophic. For many other interesting cases, acks = all is probably preferable. 3. How does the producer client know the offset to send the first message? Do we need to expose an API in producer to get the current high watermark? You're right, it might be irritating to have to go through the consumer API just for this. There are some cases where the offsets are already available -- like the commit-log-for-KV-store example -- but in general, being able to get the offsets from the producer interface does sound convenient. We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you can describe this KIP a bit then? Sure, happy to join. Thanks, Jun On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote: Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs the offset check on the message key (instead of just the partition), with bounded space requirements, at the cost of potentially some spurious failures. (ie. the produce request may fail even if that particular key hasn't been updated recently.) This addresses a
Re: [DISCUSS] KIP-27 - Conditional Publish
I'm with you on the races that could happen in the scenarios you describe, but I'm still not convinced that conditionally updating is the best call. Instead of conditionally updating, the broker could fence off the old owner to avoid spurious writes, and that's valid for all attempts. The advantage of fencing is that the broker does not accept at all requests from others, while the conditional update is a bit fragile to protect streams of publishes. -Flavio On Fri, Jul 17, 2015 at 11:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request refused -- so we've avoided corrupting the state. - Envision some copycat-like system, where we have some sharded postgres setup and we're tailing each shard into its own partition. Normally, it's fairly easy to avoid duplicates here: we can track which offset in the WAL corresponds to which offset in Kafka, and we know how many messages we've written to Kafka already, so the state is very simple. However, it is possible that for a moment -- due to rebalancing or operator error or some other thing -- two different nodes are tailing the same postgres shard at once! Normally this would introduce duplicate messages, but by specifying the expected offset, we can avoid this. So perhaps it's better to say that this is useful when a single producer is *expected*, but multiple producers are *possible*? (In the same way that the high-level consumer normally has 1 consumer in a group reading from a partition, but there are small windows where more than one might be reading at the same time.) This is also the spirit of the 'runtime cost' comment -- in the common case, where there is little to no contention, there's no performance overhead either. I mentioned this a little in the Motivation section -- maybe I should flesh that out a little bit? For me, the motivation to work this up was that I kept running into cases, like the above, where the existing API was almost-but-not-quite enough to give the guarantees I was looking for -- and the extension needed to handle those cases too was pretty small and natural-feeling. On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds
Re: [DISCUSS] KIP-27 - Conditional Publish
Up to Ben to clarify, but I'd think that in this case, it is up to the logic of B to decide what to do. B knows that the offset isn't what it expects, so it can react accordingly. If it chooses to try again, then it should not violate any application invariant. -Flavio On Fri, Jul 17, 2015 at 9:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds like you want to make sure that there is a single client writing so over multiple requests. -Flavio On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote: Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E As always, comments and suggestions are very welcome. Thanks, Ben -- -Regards, Mayuresh R. Gharat (862) 250-7125 -- Regards, Ashish
Re: [DISCUSS] KIP-27 - Conditional Publish
Yes, sorry, I think this is right -- it's pretty application-specific. One thing to note: in a large subset of cases (ie. bulk load, copycat-type, mirrormaker) the correct response is not to resend the message at all; if there's already a message at that offset, it's because another instance of the same process already sent the exact same message. On Mon, Jul 20, 2015 at 4:38 PM, Flavio P JUNQUEIRA f...@apache.org wrote: Up to Ben to clarify, but I'd think that in this case, it is up to the logic of B to decide what to do. B knows that the offset isn't what it expects, so it can react accordingly. If it chooses to try again, then it should not violate any application invariant. -Flavio On Fri, Jul 17, 2015 at 9:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds like you want to make sure that there is a single client writing so over multiple requests. -Flavio On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote: Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E As always, comments and suggestions are very welcome. Thanks, Ben -- -Regards, Mayuresh R. Gharat (862) 250-7125 -- Regards, Ashish
Re: [DISCUSS] KIP-27 - Conditional Publish
Just wanted to flag a little discussion that happened on the ticket: https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 In particular, Yasuhiro Matsuda proposed an interesting variant on this that performs the offset check on the message key (instead of just the partition), with bounded space requirements, at the cost of potentially some spurious failures. (ie. the produce request may fail even if that particular key hasn't been updated recently.) This addresses a couple of the drawbacks of the per-key approach mentioned at the bottom of the KIP. On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote: Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request refused -- so we've avoided corrupting the state. - Envision some copycat-like system, where we have some sharded postgres setup and we're tailing each shard into its own partition. Normally, it's fairly easy to avoid duplicates here: we can track which offset in the WAL corresponds to which offset in Kafka, and we know how many messages we've written to Kafka already, so the state is very simple. However, it is possible that for a moment -- due to rebalancing or operator error or some other thing -- two different nodes are tailing the same postgres shard at once! Normally this would introduce duplicate messages, but by specifying the expected offset, we can avoid this. So perhaps it's better to say that this is useful when a single producer is *expected*, but multiple producers are *possible*? (In the same way that the high-level consumer normally has 1 consumer in a group reading from a partition, but there are small windows where more than one might be reading at the same time.) This is also the spirit of the 'runtime cost' comment -- in the common case, where there is little to no contention, there's no performance overhead either. I mentioned this a little in the Motivation section -- maybe I should flesh that out a little bit? For me, the motivation to work this up was that I kept running into cases, like the above, where the existing API was almost-but-not-quite enough to give the guarantees I was looking for -- and the extension needed to handle those cases too was pretty small and natural-feeling. On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a
[DISCUSS] KIP-27 - Conditional Publish
Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E As always, comments and suggestions are very welcome. Thanks, Ben
Re: [DISCUSS] KIP-27 - Conditional Publish
I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds like you want to make sure that there is a single client writing so over multiple requests. -Flavio On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote: Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E As always, comments and suggestions are very welcome. Thanks, Ben
Re: [DISCUSS] KIP-27 - Conditional Publish
If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds like you want to make sure that there is a single client writing so over multiple requests. -Flavio On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote: Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E As always, comments and suggestions are very welcome. Thanks, Ben -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: [DISCUSS] KIP-27 - Conditional Publish
Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds like you want to make sure that there is a single client writing so over multiple requests. -Flavio On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote: Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E As always, comments and suggestions are very welcome. Thanks, Ben -- -Regards, Mayuresh R. Gharat (862) 250-7125 -- Regards, Ashish
Re: [DISCUSS] KIP-27 - Conditional Publish
Hi all, So, perhaps it's worth adding a couple specific examples of where this feature is useful, to make this a bit more concrete: - Suppose I'm using Kafka as a commit log for a partitioned KV store, like Samza or Pistachio (?) do. We bootstrap the process state by reading from that partition, and log all state updates to that partition when we're running. Now imagine that one of my processes locks up -- GC or similar -- and the system transitions that partition over to another node. When the GC is finished, the old 'owner' of that partition might still be trying to write to the commit log at the same as the new one is. A process might detect this by noticing that the offset of the published message is bigger than it thought the upcoming offset was, which implies someone else has been writing to the log... but by then it's too late, and the commit log is already corrupt. With a 'conditional produce', one of those processes will have it's publish request refused -- so we've avoided corrupting the state. - Envision some copycat-like system, where we have some sharded postgres setup and we're tailing each shard into its own partition. Normally, it's fairly easy to avoid duplicates here: we can track which offset in the WAL corresponds to which offset in Kafka, and we know how many messages we've written to Kafka already, so the state is very simple. However, it is possible that for a moment -- due to rebalancing or operator error or some other thing -- two different nodes are tailing the same postgres shard at once! Normally this would introduce duplicate messages, but by specifying the expected offset, we can avoid this. So perhaps it's better to say that this is useful when a single producer is *expected*, but multiple producers are *possible*? (In the same way that the high-level consumer normally has 1 consumer in a group reading from a partition, but there are small windows where more than one might be reading at the same time.) This is also the spirit of the 'runtime cost' comment -- in the common case, where there is little to no contention, there's no performance overhead either. I mentioned this a little in the Motivation section -- maybe I should flesh that out a little bit? For me, the motivation to work this up was that I kept running into cases, like the above, where the existing API was almost-but-not-quite enough to give the guarantees I was looking for -- and the extension needed to handle those cases too was pretty small and natural-feeling. On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote: Good concept. I have a question though. Say there are two producers A and B. Both producers are producing to same partition. - A sends a message with expected offset, x1 - Broker accepts is and sends an Ack - B sends a message with expected offset, x1 - Broker rejects it, sends nack - B sends message again with expected offset, x1+1 - Broker accepts it and sends Ack I guess this is what this KIP suggests, right? If yes, then how does this ensure that same message will not be written twice when two producers are producing to same partition? Producer on receiving a nack will try again with next offset and will keep doing so till the message is accepted. Am I missing something? Also, you have mentioned on KIP, it imposes little to no runtime cost in memory or time, I think that is not true for time. With this approach producers' performance will reduce proportionally to number of producers writing to same partition. Please correct me if I am missing out something. On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: If we have 2 producers producing to a partition, they can be out of order, then how does one producer know what offset to expect as it does not interact with other producer? Can you give an example flow that explains how it works with single producer and with multiple producers? Thanks, Mayuresh On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira fpjunque...@yahoo.com.invalid wrote: I like this feature, it reminds me of conditional updates in zookeeper. I'm not sure if it'd be best to have some mechanism for fencing rather than a conditional write like you're proposing. The reason I'm saying this is that the conditional write applies to requests individually, while it sounds like you want to make sure that there is a single client writing so over multiple requests. -Flavio On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote: Hi there, I just added a KIP for a 'conditional publish' operation: a simple CAS-like mechanism for the Kafka producer. The wiki page is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish And there's some previous discussion on the ticket and the users list: https://issues.apache.org/jira/browse/KAFKA-2260