[jira] [Created] (KAFKA-5617) Update to the list of third-party clients
Daniel Schierbeck created KAFKA-5617: Summary: Update to the list of third-party clients Key: KAFKA-5617 URL: https://issues.apache.org/jira/browse/KAFKA-5617 Project: Kafka Issue Type: Wish Reporter: Daniel Schierbeck I'd like to have the list of Ruby client libraries updated to reflect the current state of affairs. * ruby-kafka is no longer compatible with Kafka 0.8, but is compatible with 0.9+. It should probably be moved to the top, since it's the only actively maintained low-level library – all other libraries are either unmaintained or are opinionated frameworks built on top of ruby-kafka. * I'd like to add Racecar (https://github.com/zendesk/racecar), a simple opinionated framework built on top of ruby-kafka. It's an extraction from a production Zendesk code base, and so is already pretty battle tested. I'm the author of both ruby-kafka and Racecar. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging
Hi Apurva, Thanks for taking the time to reply. You make excellent points, and it sounds like the right tradeoff. It would be great if the coordinator code could be shared with the consumer API, hope that's actually the case. Daniel Schierbeck On Thu, 1 Dec 2016 at 18.34 Apurva Mehta <apu...@confluent.io> wrote: > Hi Daniel, > > That is a very good point. You are correct in saying that one does not need > a transaction coordinator to get idempotent semantics. > > There are, however, three reasons why we chose this route: > > 1. The request to find a transaction coordinator is exactly the same as > the request consumers use to find the group coordinator. So if clients > already implement the new consumer, you should already have the code you > need to find the transaction coordinator. I would even so far as to say > that the majority coordinator discovery code can be effectively shared > between producers and consumers. Jason should correct me on this, however, > since he is most familiar with that bit. > 2. With this route, the broker side changes are simpler. In particular, > we have to implement the InitPIDRequest only in the coordinator. > 3. By always having a transaction coordinator, we can enable > applications to use transactions even if they don't specify the AppId. The > only thing you lose is transaction recovery across sessions. > > Needless to say, we did debate this point extensively. What swung our > decision ultimately was the following observation: if the user does not > provide a transaction.app.id, the client can generate a UUID and use that > as the appId for the rest of the session. This means that there are no > branches in the client and server code, and is overall simpler to maintain. > All the producer APIs are also available to the user and it would be more > intuitive. > > It also means that clients cannot choose idempotence without transactions, > and hence it does place a greater burden on implementors of kafka clients. > But the cost should be minimal given point 1 above, and was deemed worth > it. > > Thanks once more for your thoughtful comments. It would be great for other > client implementors to chime in on this. > > Regards, > Apurva > > > > On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck > <da...@zendesk.com.invalid > > wrote: > > > Hi there, > > > > I'm the author of ruby-kafka, and as such am slightly biased towards > > simplicity of implementation :-) > > > > I like the proposal, and would love to use idempotent producer semantics > in > > our projects at Zendesk, but I'm a bit worried about the complexity that > > would go into the clients; specifically: it sounds to me that in order to > > get idempotent producer semantics, I'd have to implement the transaction > > coordinator discovery. I may be wrong, but it would seem that it's not > > strictly necessary if you're not using transactions – we could just use > the > > topic partition's leader as the coordinator, avoiding the extra > discovery. > > In my experience, most bugs are related to figuring out which broker is > the > > leader of which partition/group/whatever, so minimizing the number of > > moving parts would be beneficial to me. I'd also like to point out that I > > would be reluctant to implement the transaction API in the near future, > but > > would love to implement the idempotency API soon. The former seems only > > relevant to real stream processing frameworks, which is probably not the > > best use case for ruby-kafka. > > > > Cheers, > > Daniel Schierbeck > > > > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io> > wrote: > > > > > Hey Neha, > > > > > > Thanks for the thoughtful questions. I'll try to address the first > > question > > > since Apurva addressed the second. Since most readers are probably > > getting > > > up to speed with this large proposal, let me first take a step back and > > > explain why we need the AppID at all. As Confluent tradition demands, I > > > present you a big wall of text: > > > > > > Clearly "exactly once" delivery requires resilience to client failures. > > > When a client crashes or turns into a zombie, another client must > > > eventually be started to resume the work. There are two problems: 1) we > > > need to ensure that the old process is actually dead or at least that > it > > > cannot write any more data, and 2) we need to be able to pick up > wherever > > > the last process left off. To do either of these, we need some kind of > > > identifier to tie t
Re: [DISCUSS] KIP-82 - Add Record Headers
I don't have a lot of feedback on this, but at Zendesk we could definitely use a standardized header system. Using ints as keys sounds tedious, but if that's a necessary tradeoff I'd be okay with it. On Fri, Dec 2, 2016 at 5:44 AM Todd Palinowrote: > Come on, I’ve done at least 2 talks on this one :) > > Producing counts to a topic is part of it, but that’s only part. So you > count you have 100 messages in topic A. When you mirror topic A to another > cluster, you have 99 messages. Where was your problem? Or worse, you have > 100 messages, but one producer duplicated messages and another one lost > messages. You need details about where the message came from in order to > pinpoint problems when they happen. Source producer info, where it was > produced into your infrastructure, and when it was produced. This requires > you to add the information to the message. > > And yes, you still need to maintain your clients. So maybe my original > example was not the best. My thoughts on not wanting to be responsible for > message formats stands, because that’s very much separate from the client. > As you know, we have our own internal client library that can insert the > right headers, and right now inserts the right audit information into the > message fields. If they exist, and assuming the message is Avro encoded. > What if someone wants to use JSON instead for a good reason? What if user X > wants to encrypt messages, but user Y does not? Maintaining the client > library is still much easier than maintaining the message formats. > > > -Todd > > > > On Thu, Dec 1, 2016 at 6:21 PM, Gwen Shapira wrote: > > > Based on your last sentence, consider me convinced :) > > > > I get why headers are critical for Mirroring (you need tags to prevent > > loops and sometimes to route messages to the correct destination). > > But why do you need headers to audit? We are auditing by producing > > counts to a side topic (and I was under the impression you do the > > same), so we never need to modify the message. > > > > Another thing - after we added headers, wouldn't you be in the > > business of making sure everyone uses them properly? Making sure > > everyone includes the right headers you need, not using the header > > names you intend to use, etc. I don't think the "policing" business > > will ever go away. > > > > On Thu, Dec 1, 2016 at 5:25 PM, Todd Palino wrote: > > > Got it. As an ops guy, I'm not very happy with the workaround. Avro > means > > > that I have to be concerned with the format of the messages in order to > > run > > > the infrastructure (audit, mirroring, etc.). That means that I have to > > > handle the schemas, and I have to enforce rules about good formats. > This > > is > > > not something I want to be in the business of, because I should be able > > to > > > run a service infrastructure without needing to be in the weeds of > > dealing > > > with customer data formats. > > > > > > Trust me, a sizable portion of my support time is spent dealing with > > schema > > > issues. I really would like to get away from that. Maybe I'd have more > > time > > > for other hobbies. Like writing. ;) > > > > > > -Todd > > > > > > On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira wrote: > > > > > >> I'm pretty satisfied with the current workarounds (Avro container > > >> format), so I'm not too excited about the extra work required to do > > >> headers in Kafka. I absolutely don't mind it if you do it... > > >> I think the Apache convention for "good idea, but not willing to put > > >> any work toward it" is +0.5? anyway, that's what I was trying to > > >> convey :) > > >> > > >> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino > wrote: > > >> > Well I guess my question for you, then, is what is holding you back > > from > > >> > full support for headers? What’s the bit that you’re missing that > has > > you > > >> > under a full +1? > > >> > > > >> > -Todd > > >> > > > >> > > > >> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira > > wrote: > > >> > > > >> >> I know why people who support headers support them, and I've seen > > what > > >> >> the discussion is like. > > >> >> > > >> >> This is why I'm asking people who are against headers (especially > > >> >> committers) what will make them change their mind - so we can get > > this > > >> >> part over one way or another. > > >> >> > > >> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am > > >> >> just looking for something concrete we can do to move the > discussion > > >> >> along to the yummy design details (which is the argument I really > am > > >> >> looking forward to). > > >> >> > > >> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino > > wrote: > > >> >> > So, Gwen, to your question (even though I’m not a committer)... > > >> >> > > > >> >> > I have always been a strong supporter of introducing the concept > > of
Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging
Hi there, I'm the author of ruby-kafka, and as such am slightly biased towards simplicity of implementation :-) I like the proposal, and would love to use idempotent producer semantics in our projects at Zendesk, but I'm a bit worried about the complexity that would go into the clients; specifically: it sounds to me that in order to get idempotent producer semantics, I'd have to implement the transaction coordinator discovery. I may be wrong, but it would seem that it's not strictly necessary if you're not using transactions – we could just use the topic partition's leader as the coordinator, avoiding the extra discovery. In my experience, most bugs are related to figuring out which broker is the leader of which partition/group/whatever, so minimizing the number of moving parts would be beneficial to me. I'd also like to point out that I would be reluctant to implement the transaction API in the near future, but would love to implement the idempotency API soon. The former seems only relevant to real stream processing frameworks, which is probably not the best use case for ruby-kafka. Cheers, Daniel Schierbeck On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io> wrote: > Hey Neha, > > Thanks for the thoughtful questions. I'll try to address the first question > since Apurva addressed the second. Since most readers are probably getting > up to speed with this large proposal, let me first take a step back and > explain why we need the AppID at all. As Confluent tradition demands, I > present you a big wall of text: > > Clearly "exactly once" delivery requires resilience to client failures. > When a client crashes or turns into a zombie, another client must > eventually be started to resume the work. There are two problems: 1) we > need to ensure that the old process is actually dead or at least that it > cannot write any more data, and 2) we need to be able to pick up wherever > the last process left off. To do either of these, we need some kind of > identifier to tie the two instances together. > > There are only two choices for where this ID comes from: either the user > gives it to us or the server generates it. In the latter case, the user is > responsible for fetching it from the client and persisting it somewhere for > use after failure. We ultimately felt that the most flexible option is to > have the user give it to us. In many applications, there is already a > natural identifier which is already used to divide the workload. For > example, in Kafka Streams and Kafka Connect, we have a taskId. For > applications where there is no natural ID, the user can generate a UUID and > persist it locally, which is as good as having the server generate it. > > So the AppID is used to provide continuity between the instances of a > producer which are handling a certain workload. One of the early design > decisions we made in this work was to make the delivery guarantees we > provide agnostic of the workload that the producer is assigned. The > producer is not in the business of trying to divide up the work among all > its peers who are participating in the same duty (unlike the consumer, we > don't know anything about where the data comes from). This has huge > implications for "exactly-once" delivery because it puts the burden on the > user to divide the total workload among producer instances and to assign > AppIDs accordingly. > > I've been using the term "workload" loosely, but we usually imagine > something like Kafka Connect's notion of a "source partition." A source > partition could be a topic partition if the source is Kafka, or it could be > a database table, a log file, or whatever makes sense for the source of the > data. The point is that it's an independent source of data which can be > assigned to a producer instance. > > If the same source partition is always assigned to the producer with the > the same AppID, then Kafka transactions will give you "exactly once" > delivery without much additional work. On initialization, the producer will > ensure that 1) any previous producers using that AppID are "fenced" off, > and 2) that any transaction which had been started by a previous producer > with that AppID have either completed or aborted. > > Based on this, it should be clear that the ideal is to divide the workload > so that you have a one-to-one mapping from the source partition to the > AppID. If the source of the data is Kafka, then the source partition is > just a topic partition, and the AppID can be generated from the name of the > topic and the partition number. > > To finally get back to your auto-scaling question, let's assume for a > moment the ideal mapping of source partition to AppID. The main question is > whether
Re: [DISCUSS] KIP-27 - Conditional Publish
Jiangjie: I think giving users the possibility of defining a custom policy for handling rejections is a good idea. For instance, this will allow Kafka to act as an event store in an Event Sourcing application. If the event(s) are rejected by the store, the original command may need to be re-validated against the new state. On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid wrote: @Ewen, good point about batching. Yes, it would be tricky if we want to do a per-key conditional produce. My understanding is that the prerequisite of this KIP is: 1. Single producer for each partition. 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER The major problem it tries to solve is exact once produce, i.e. solve the duplicates from producer side. In that case, a batch will be considered as atomic. The only possibility of a batch got rejected should be it is already appended. So the producer should just move on. It looks to me even a transient multiple producer scenario will cause issue because user need to think about what should do if a request got rejected and the answer varies for different use cases. Thanks, Jiangjie (Becket) Qin On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote: So I had another look at the 'Idempotent Producer' proposal this afternoon, and made a few notes on how I think they compare; if I've made any mistakes, I'd be delighted if someone with more context on the idempotent producer design would correct me. As a first intuition, you can think of the 'conditional publish' proposal as the special case of the 'idempotent producer' idea, where there's just a single producer per-partition. The key observation here is: if there's only one producer, you can conflate the 'sequence number' and the expected offset. The conditional publish proposal uses existing Kafka offset APIs for roughly the same things as the idempotent producer proposal uses sequence numbers for -- eg. instead of having a lease PID API that returns the current sequence number, we can use the existing 'offset API' to retrieve the upcoming offset. Both proposals attempt to deal with the situation where there are transiently multiple publishers for the same partition (and PID). The idempotent producer setup tracks a generation id for each pid, and discards any writes with a generation id smaller than the latest value. Conditional publish is 'first write wins' -- and instead of dropping duplicates on the server, it returns an error to the client. The duplicate-handling behaviour (dropping vs. erroring) has some interesting consequences: - If all producers are producing the same stream of messages, silently dropping duplicates on the server is more convenient. (Suppose we have a batch of messages 0-9, and the high-water mark on the server is 7. Idempotent producer, as I read it, would append 7-9 to the partition and return success; meanwhile, conditional publish would fail the entire batch.) - If producers might be writing different streams of messages, the proposed behaviour of the idempotent producer is probably worse -- since it can silently interleave messages from two different producers. This can be a problem for some commit-log style use-cases, since it can transform a valid series of operations into an invalid one. - Given the error-on-duplicate behaviour, it's possible to implement deduplication on the client. (Sketch: if a publish returns an error for some partition, fetch the upcoming offset / sequence number for that partition, and discard all messages with a smaller offset on the client before republishing.) I think this makes the erroring behaviour more general, though deduplicating saves a roundtrip or two at conflict time. I'm less clear about the behaviour of the generation id, or what happens when (say) two producers with the same generation id are spun up at the same time. I'd be interested in hearing other folks' comments on this. Ewen: I'm not sure I understand the questions well enough to answer properly, but some quick notes: - I don't think it makes sense to assign an expected offset without already having assigned a partition. If the producer code is doing the partition assignment, it should probably do the offset assignment too... or we could just let application code handle both. - I'm not aware of any case where reassigning offsets to messages automatically after an offset mismatch makes sense: in the cases we've discussed, it seems like either it's safe to drop duplicates, or we want to handle the error at the application level. I'm going to try and come with an idempotent-producer-type example that works with the draft patch in the next few days, so hopefully we'll have something more concrete to discuss. Otherwise -- if you have a clear idea of how eg. sequence number assignment would work in the
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635281#comment-14635281 ] Daniel Schierbeck commented on KAFKA-2260: -- Where is the KIP being discussed? I couldn't find any mention of this in the dev list archive. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: expected-offsets.patch I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585601#comment-14585601 ] Daniel Schierbeck commented on KAFKA-2260: -- On the mailing list we discussed a way to decrease contention due to these sort of writes by making the write conditional match the last offset of the message's _key_ rather than the entire partition. This way, a write will only be rejected if another producer has written a message with the same key. This will also allow the producer to get better feedback when using Kafka as a storage backend for Event Source style events, where a rejected write may require re-evaluating the original command with the updated context, e.g. change ticket title may be invalid now that the event ticket closed has been written. Maybe the user should be notified synchronously and allowed to take action. The minimum requirement for implementing per-key conditional writes is that the broker must maintain an in-memory table mapping message keys to their highest offsets. The table can be saved to disk from time to time in order to cut down the time needed to rebuild it when recovering from a crash. Allow specifying expected offset on produce --- Key: KAFKA-2260 URL: https://issues.apache.org/jira/browse/KAFKA-2260 Project: Kafka Issue Type: Improvement Reporter: Ben Kirwin Priority: Minor I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. h4. Proposed Change In short: - Allow the user to attach a specific offset to each message produced. - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. h4. Motivation Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. There's a bunch of other similar use-cases here, but they all have roughly the same flavour. h4. Implementation The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) - We'd need a new error and error code for a 'CAS failure'. - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Optimistic locking
Regarding the use case – some events may only be valid given a specific state of the world, e.g. a NewComment event and a PostClosedForComments event would be valid only in that order. If two producer processes (e.g. two HTTP application processes) tries to write an event each, you may get integrity issues depending on the order. In this case, knowing that you *must* build upon that latest event for a key will provide application authors a way to ensure integrity. I'm not quite sure I understand the challenge in the implementation – how is key-based message expiration implemented, if not with the some mapping of message keys to the last written offset? How else would you know whether the message you're examining is the last or not? On Mon Jan 05 2015 at 1:41:39 PM Colin Clark co...@clark.ws wrote: Hi, Couple of comments on this. What you're proposing is difficult to do at scale and would require some type of Paxos style algorithm for the update only if different - it would be easier in that case to just go ahead and do the update. Also, it seems like a conflation of concerns - in an event sourcing model, we save the immutable event, and represent current state in another, separate data structure. Perhaps cassandra would work well here - that data model night provide what you're looking for out of the box. Just as I don't recommend people use data stores as queuing mechanisms, I also recommend not using a queuing mechanism as a primary datastore - mixed semantics. -- *Colin* +1 612 859-6129 On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether the request should proceed: 1. For every key seen, keep track of the offset of the latest message referencing the key. 2. When an OffsetContraint is specified in the produce API call, compare that value with the latest offset for the message key. 2.1. If they're identical, allow the operation to continue. 2.2. If they're not identical, fail with some OptimisticLockingFailure. Would such a feature be completely out of scope for Kafka? Best regards, Daniel Schierbeck
Re: Optimistic locking
Reading your reply again, I'd like to address this: Also, it seems like a conflation of concerns - in an event sourcing model, we save the immutable event, and represent current state in another, separate data structure. That's the idea – events are stored in Kafka and processed sequentially by a single process per partition. Those processes update a secondary store (e.g. MySQL) with the current state. However, when determining whether an event is valid, the current state must be taken into account. E.g. you can only withdraw money from an open account, so the event sequence AccountClosed - MoneyWithdrawn is invalid. The only way I can think of to ensure this is the case is to have optimistic locking. Each account would have a unique key, and in order to write an event to Kafka the secondary store *must* be up-to-date with the previous events for that key. If not, it should wait a bit and try again, re-validating the input against the new state of the world. Is that a completely hopeless idea? On Mon Jan 05 2015 at 2:18:49 PM Daniel Schierbeck daniel.schierb...@gmail.com wrote: Regarding the use case – some events may only be valid given a specific state of the world, e.g. a NewComment event and a PostClosedForComments event would be valid only in that order. If two producer processes (e.g. two HTTP application processes) tries to write an event each, you may get integrity issues depending on the order. In this case, knowing that you *must* build upon that latest event for a key will provide application authors a way to ensure integrity. I'm not quite sure I understand the challenge in the implementation – how is key-based message expiration implemented, if not with the some mapping of message keys to the last written offset? How else would you know whether the message you're examining is the last or not? On Mon Jan 05 2015 at 1:41:39 PM Colin Clark co...@clark.ws wrote: Hi, Couple of comments on this. What you're proposing is difficult to do at scale and would require some type of Paxos style algorithm for the update only if different - it would be easier in that case to just go ahead and do the update. Also, it seems like a conflation of concerns - in an event sourcing model, we save the immutable event, and represent current state in another, separate data structure. Perhaps cassandra would work well here - that data model night provide what you're looking for out of the box. Just as I don't recommend people use data stores as queuing mechanisms, I also recommend not using a queuing mechanism as a primary datastore - mixed semantics. -- *Colin* +1 612 859-6129 On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether the request should proceed: 1. For every key seen, keep track of the offset of the latest message referencing the key. 2. When an OffsetContraint is specified in the produce API call, compare that value with the latest offset for the message key. 2.1. If they're identical, allow the operation to continue. 2.2. If they're not identical, fail with some OptimisticLockingFailure. Would such a feature be completely out of scope for Kafka? Best regards, Daniel Schierbeck
Re: Optimistic locking
Reading the source code, it seems that kafka.log.LogCleaner builds a kafka.log.OffsetMap only when compacting logs, not ahead of time, so the information is not available at write time :-/ If I'm not mistaken, this also means that the cleaner needs _two_ passes to clean a log segment, one to build up the key-last_offset map and one to actually remove expired messages. Having the map be available up front could make that be a single-pass algorithm. Especially if the segments are too large to keep in memory this should be considerable faster, at the cost of having to maintain the mapping for each write. I'd be willing to do the coding if you think it's a good idea. On Mon Jan 05 2015 at 2:24:00 PM Daniel Schierbeck daniel.schierb...@gmail.com wrote: Reading your reply again, I'd like to address this: Also, it seems like a conflation of concerns - in an event sourcing model, we save the immutable event, and represent current state in another, separate data structure. That's the idea – events are stored in Kafka and processed sequentially by a single process per partition. Those processes update a secondary store (e.g. MySQL) with the current state. However, when determining whether an event is valid, the current state must be taken into account. E.g. you can only withdraw money from an open account, so the event sequence AccountClosed - MoneyWithdrawn is invalid. The only way I can think of to ensure this is the case is to have optimistic locking. Each account would have a unique key, and in order to write an event to Kafka the secondary store *must* be up-to-date with the previous events for that key. If not, it should wait a bit and try again, re-validating the input against the new state of the world. Is that a completely hopeless idea? On Mon Jan 05 2015 at 2:18:49 PM Daniel Schierbeck daniel.schierb...@gmail.com wrote: Regarding the use case – some events may only be valid given a specific state of the world, e.g. a NewComment event and a PostClosedForComments event would be valid only in that order. If two producer processes (e.g. two HTTP application processes) tries to write an event each, you may get integrity issues depending on the order. In this case, knowing that you *must* build upon that latest event for a key will provide application authors a way to ensure integrity. I'm not quite sure I understand the challenge in the implementation – how is key-based message expiration implemented, if not with the some mapping of message keys to the last written offset? How else would you know whether the message you're examining is the last or not? On Mon Jan 05 2015 at 1:41:39 PM Colin Clark co...@clark.ws wrote: Hi, Couple of comments on this. What you're proposing is difficult to do at scale and would require some type of Paxos style algorithm for the update only if different - it would be easier in that case to just go ahead and do the update. Also, it seems like a conflation of concerns - in an event sourcing model, we save the immutable event, and represent current state in another, separate data structure. Perhaps cassandra would work well here - that data model night provide what you're looking for out of the box. Just as I don't recommend people use data stores as queuing mechanisms, I also recommend not using a queuing mechanism as a primary datastore - mixed semantics. -- *Colin* +1 612 859-6129 On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck daniel.schierb...@gmail.com wrote: I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether
[jira] [Created] (KAFKA-1827) Optimistic Locking when Producing Messages
Daniel Schierbeck created KAFKA-1827: Summary: Optimistic Locking when Producing Messages Key: KAFKA-1827 URL: https://issues.apache.org/jira/browse/KAFKA-1827 Project: Kafka Issue Type: Improvement Reporter: Daniel Schierbeck (I wasn't able to post to the ML, so I'm adding an issue instead. I hope that's okay.) I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether the request should proceed: 1. For every key seen, keep track of the offset of the latest message referencing the key. 2. When an OffsetContraint is specified in the produce API call, compare that value with the latest offset for the message key. 2.1. If they're identical, allow the operation to continue. 2.2. If they're not identical, fail with some OptimisticLockingFailure. Would such a feature be completely out of scope for Kafka? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Optimistic locking
I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether the request should proceed: 1. For every key seen, keep track of the offset of the latest message referencing the key. 2. When an OffsetContraint is specified in the produce API call, compare that value with the latest offset for the message key. 2.1. If they're identical, allow the operation to continue. 2.2. If they're not identical, fail with some OptimisticLockingFailure. Would such a feature be completely out of scope for Kafka? Best regards, Daniel Schierbeck
Optimistic locking
I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether the request should proceed: 1. For every key seen, keep track of the offset of the latest message referencing the key. 2. When an OffsetContraint is specified in the produce API call, compare that value with the latest offset for the message key. 2.1. If they're identical, allow the operation to continue. 2.2. If they're not identical, fail with some OptimisticLockingFailure. Would such a feature be completely out of scope for Kafka? Best regards, Daniel Schierbeck