[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15786719#comment-15786719 ] Ewen Cheslack-Postava commented on KAFKA-2260: -- [~wwarshaw] That sounds right -- the epoch for the PID would ensure a single writer and then the actual offset wouldn't matter. KIP-98 hasn't been voted on yet, so it's be difficult to give a timeline now, but it seems unlikely to happen before the June release timeframe. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780697#comment-15780697 ] Bill Warshaw commented on KAFKA-2260: - [~ijuma] it seems like we would be able to satisfy this use case with KIP-98 by specifying a global {{PID}} for {{Producer}} instances in a distributed application. An application would have to use a {{Producer}} with this specific {{PID}} to publish any messages which needed a sequential guarantee. Does that make sense? Is there a timeline for KIP-98? > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768183#comment-15768183 ] Ismael Juma commented on KAFKA-2260: The following KIP may be of interest: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768180#comment-15768180 ] Enrico Olivelli commented on KAFKA-2260: I am interested in this new feature as it will enable kafka to be used as transaction log for replicated state machines where it is important that only one node can actually modify the state of the system, that it to write to the log > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768086#comment-15768086 ] Bill Warshaw commented on KAFKA-2260: - I'd like to revive the discussion on this KIP. I've worked on multiple projects where this functionality would have saved us a lot of effort and complexity. I have a working proof-of-concept branch locally. I had to switch approaches from the attached patch, because of changes to how Kafka uses the offset field in KIP-31 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets). I ended up adding a field to the {{TOPIC_PRODUCE_DATA}} API; the changes are fairly minor, and are mostly just updating method signatures. I'd be interested in contributing a patch upstream if there's still a desire for it. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313258#comment-15313258 ] Ben Kirwin commented on KAFKA-2260: --- Hi! I haven't had time to push on this at all in the last few months, but I'm still interested as well. My understanding was that the core team was very focussed on streams / the other features in the 0.10 release, but would be interested in looking at coordination proposals after that. (This / idempotent producer / etc.) Does anyone think working this up into a github PR seems useful, or is there another next best step? The existing KIP is already fairly well fleshed-out, I think: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643065#comment-14643065 ] Mayuresh Gharat commented on KAFKA-2260: I think, when 2 producers are trying to produce concurrently, the broker log will be appended in a specific order, which means the corresponding offsets in the respective sub-partitions will also be incremented in that order and the client should be able to figure out from ProduceResponse what is the next offset it needs to send the data to. This is what I understand from the above discussion. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635281#comment-14635281 ] Daniel Schierbeck commented on KAFKA-2260: -- Where is the KIP being discussed? I couldn't find any mention of this in the dev list archive. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635289#comment-14635289 ] Ismael Juma commented on KAFKA-2260: [~dasch] The KIP is: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633721#comment-14633721 ] Jay Kreps commented on KAFKA-2260: -- Yes, exactly. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634117#comment-14634117 ] Flavio Junqueira commented on KAFKA-2260: - I like the use of an array to increase the degree of concurrency. This is actually a common trick in concurrent data structures, so suitable here. But, in this case, unless I'm missing the point, isn't it the case that you can't guarantee that two publishers end up succeeding when publishing concurrently, which is one of the use cases that [~bkirwi] says he is trying to avoid? Could you guys clarify this, please? Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632537#comment-14632537 ] Ben Kirwin commented on KAFKA-2260: --- Ah, clever! Thanks for sharing this -- I've linked to it on the discussion thread. {quote} When changing the number of sub-partitions, the broker doesn't have to recompute sub-partition high water marks. It can initialize all array elements with the partition's high water mark. {quote} It seems we could also do this initialization every time the log is opened -- and avoid any persistent storage for the 'sub-partition' offsets at all. This would remove another major drawback of the per-key approach. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14630870#comment-14630870 ] Ben Kirwin commented on KAFKA-2260: --- Opened a KIP for this here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish [~becket_qin]: Thanks! To #1 and #3 -- this feature is aimed at the special (but fairly common) case where we only expect one producer for a single partition at a given time. More complicated situations definitely require more elaborate coordination -- and while it's possible that you might implement more elaborate coordination mechanisms on top of this, I've left that out of scope for now. To #2 -- there are definitely cases where key-based CAS feels handier, but the converse is also true, and tracking offsets for each key requires some auxiliary data with some commensurate overhead. I discussed this a little bit in the KIP -- though the point that 'partial failure' becomes more likely is not something I'd considered, and also a very good point. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631502#comment-14631502 ] Jiangjie Qin commented on KAFKA-2260: - [~bkirwi] The question 1 I had is actually for single producer case. What I'm wondering is that how this CAS guarantee exact once delivery? Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631512#comment-14631512 ] Jay Kreps commented on KAFKA-2260: -- [~yasuhiro.matsuda] had a good variation on this feature that would be worth sharing. It hashes the keys over an array which let's you avoid all writes colliding if there are multiple writers but does not require actually storing an offset per key which I think would be prohibitive. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632225#comment-14632225 ] Ewen Cheslack-Postava commented on KAFKA-2260: -- [~bkirwi] That's how I interpreted it initially as well, but with the right scheme it actually just works if you use the high watermark for the entire partition. I'll leave it to [~ymatsuda] to give the complete explanation of his very nice extension of this idea. I'd suggest we should shift this discussion to the KIP mailing list thread. That has better visibility than a JIRA thread, so we'll probably get a more thorough and diverse discussion there. There are already some questions from others being addressed in that thread. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632259#comment-14632259 ] Yasuhiro Matsuda commented on KAFKA-2260: - Here is the outline of the variant Jay mentioned. - A broker holds a fixed size array of offsets for each partition. Array indexes are hash of keys. In a sense, an array element works as a sub-partition. Sub-partitions do not hold data (messages). All they have are the high water marks. - The broker maintains high water marks for each sub-partition. A sub-partition high water mark is updated when a message whose key belongs to the sub-partition is appended to the log. - An application maintains the high water mark of each partition (not sub-partition!) as it consumes messages. It doesn't need to know anything about sub-partitions in a broker. A produce request is processed as follows. 1. The producer sends the known high water mark of the partition with a message. 2. The broker compares the high water mark in the produce request and the high water mark of the sub-partition corresponding the message key. 3. If the former is greater than the latter, the broker accepts the produce request. (Note that this is not equality test!) 4. Otherwise, the broker rejects the request. A nice thing about this is that it is easy to increase the concurrency without re-partitioning, and its overhead is predictable. When changing the number of sub-partitions, the broker doesn't have to recompute sub-partition high water marks. It can initialize all array elements with the partition's high water mark. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632120#comment-14632120 ] Ben Kirwin commented on KAFKA-2260: --- Interesting -- the idea is that we track the max offset per *hash* of the key, instead of the key itself? I guess that if you use an array of length 1, this reduces to the current proposal. :) It would be interesting to calculate how frequently different keys would conflict, given a good hash function. It seems like, for this to work, you'd need to add an additional method to the API to get the current offset for the hash of a given key? Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632113#comment-14632113 ] Ben Kirwin commented on KAFKA-2260: --- Ah, sorry! Let me try again. Suppose you try and send a batch of messages to a partition, but get some network error -- it's possible that they were published successfully, or that they were lost before they made it to the broker. With the CAS, the simple thing to do is to resend the same batch with the same expected offsets. If the messages were published correctly last time, you'll get a check mismatch error; and if they weren't, they'll be appended correctly. If the series of messages that the producer wants to send is fixed, the same mechanism would work even through producer restarts. If the set of messages isn't fixed -- the producer might have a completely different set of messages to send after restarting -- than what it means to be exactly-once becomes a lot more domain-dependent; you might want to write exactly one group of messages for each input message, or rpc request, or five-minute interval -- but that requires coordination between a bunch of different moving parts, and I don't think there's one coordination mechanism that handles all cases. (This 'expected offset' thing is enough for some, but certainly not all of them...) Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14630806#comment-14630806 ] Jiangjie Qin commented on KAFKA-2260: - This is a very interesting proposal and would be useful in many cases. I have some questions below: 1. Is this CAS supposed to completely solve the idempotent issue? If so, what would happen if a broker takes in a producer request, appends it to the log but then producer dies before it receives ProducerResponse? 2. IIRC, in mailing list the scenario was trying to solve double booking of one ticket. In this case, only offset-based CAS might not work, right? Maybe adding key-based CAS would help a bit, but there might be some potential issue because messages are sent in a batch, some key might be conflict and some might not. 3. Is it the case that this CAS mainly focus on a multiple producer but very low traffic scenario? Because even if there are only two producers, as long as they are consistently producing data, I imagine the reject rate would be very high. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14630622#comment-14630622 ] Jay Kreps commented on KAFKA-2260: -- I like this idea a lot and the change is quite simple and natural. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627018#comment-14627018 ] Ewen Cheslack-Postava commented on KAFKA-2260: -- Hey [~bkirwi], this looks interesting. Since it's a pretty important user-facing change, it'd be good to convert this into a KIP, send it to the mailing list, and get broader feedback on it. It looks like you've already got a lot of what you'd need here. The wip patch is also nice since it'll help people understand the impact. One thing you might want to add is a bit of discussion of other alternatives since it'll come up in the discussion anyway. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627460#comment-14627460 ] Ben Kirwin commented on KAFKA-2260: --- Hi [~ewencp] -- thanks for the interest! I'd be glad to work this up into a KIP, but it looks like I don't have the permissions to create a wiki page... could you set that up? Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627484#comment-14627484 ] Ben Kirwin commented on KAFKA-2260: --- Will do; thanks! Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627482#comment-14627482 ] Ewen Cheslack-Postava commented on KAFKA-2260: -- [~bkirwi] Unfortunately I don't have that permission. If you ping the dev list, someone should be able to give you access. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585601#comment-14585601 ] Daniel Schierbeck commented on KAFKA-2260: -- On the mailing list we discussed a way to decrease contention due to these sort of writes by making the write conditional match the last offset of the message's _key_ rather than the entire partition. This way, a write will only be rejected if another producer has written a message with the same key. This will also allow the producer to get better feedback when using Kafka as a storage backend for Event Source style events, where a rejected write may require re-evaluating the original command with the updated context, e.g. change ticket title may be invalid now that the event ticket closed has been written. Maybe the user should be notified synchronously and allowed to take action. The minimum requirement for implementing per-key conditional writes is that the broker must maintain an in-memory table mapping message keys to their highest offsets. The table can be saved to disk from time to time in order to cut down the time needed to rebuild it when recovering from a crash. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Priority: Minor I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)