[jira] [Created] (KAFKA-5617) Update to the list of third-party clients

2017-07-20 Thread Daniel Schierbeck (JIRA)
Daniel Schierbeck created KAFKA-5617:


 Summary: Update to the list of third-party clients
 Key: KAFKA-5617
 URL: https://issues.apache.org/jira/browse/KAFKA-5617
 Project: Kafka
  Issue Type: Wish
Reporter: Daniel Schierbeck


I'd like to have the list of Ruby client libraries updated to reflect the 
current state of affairs.

* ruby-kafka is no longer compatible with Kafka 0.8, but is compatible with 
0.9+. It should probably be moved to the top, since it's the only actively 
maintained low-level library – all other libraries are either unmaintained or 
are opinionated frameworks built on top of ruby-kafka.
* I'd like to add Racecar (https://github.com/zendesk/racecar), a simple 
opinionated framework built on top of ruby-kafka. It's an extraction from a 
production Zendesk code base, and so is already pretty battle tested.

I'm the author of both ruby-kafka and Racecar.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-04 Thread Daniel Schierbeck
Hi Apurva,

Thanks for taking the time to reply. You make excellent points, and it
sounds like the right tradeoff. It would be great if the coordinator code
could be shared with the consumer API, hope that's actually the case.

Daniel Schierbeck

On Thu, 1 Dec 2016 at 18.34 Apurva Mehta <apu...@confluent.io> wrote:

> Hi Daniel,
>
> That is a very good point. You are correct in saying that one does not need
> a transaction coordinator to get idempotent semantics.
>
> There are, however, three reasons why we chose this route:
>
> 1. The request to find a transaction coordinator is exactly the same as
> the request consumers use to find the group coordinator. So if clients
> already implement the new consumer, you should already have the code you
> need to find the transaction coordinator. I would even so far as to say
> that the majority coordinator discovery code can be effectively shared
> between producers and consumers. Jason should correct me on this, however,
> since he is most familiar with that bit.
> 2. With this route, the broker side changes are simpler. In particular,
> we have to implement the InitPIDRequest only in the coordinator.
> 3. By always having a transaction coordinator, we can enable
> applications to use transactions even if they don't specify the AppId. The
> only thing you lose is transaction recovery across sessions.
>
> Needless to say, we did debate this point extensively. What swung our
> decision ultimately was the following observation: if the user does not
> provide a transaction.app.id, the client can generate a UUID and use that
> as the appId for the rest of the session. This means that there are no
> branches in the client and server code, and is overall simpler to maintain.
> All the producer APIs are also available to the user and it would be more
> intuitive.
>
> It also means that clients cannot choose idempotence without transactions,
> and hence it does place a greater burden on implementors of kafka clients.
> But the cost should be minimal given point 1 above, and was deemed worth
> it.
>
> Thanks once more for your thoughtful comments. It would be great for other
> client implementors to chime in on this.
>
> Regards,
> Apurva
>
>
>
> On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck
> <da...@zendesk.com.invalid
> > wrote:
>
> > Hi there,
> >
> > I'm the author of ruby-kafka, and as such am slightly biased towards
> > simplicity of implementation :-)
> >
> > I like the proposal, and would love to use idempotent producer semantics
> in
> > our projects at Zendesk, but I'm a bit worried about the complexity that
> > would go into the clients; specifically: it sounds to me that in order to
> > get idempotent producer semantics, I'd have to implement the transaction
> > coordinator discovery. I may be wrong, but it would seem that it's not
> > strictly necessary if you're not using transactions – we could just use
> the
> > topic partition's leader as the coordinator, avoiding the extra
> discovery.
> > In my experience, most bugs are related to figuring out which broker is
> the
> > leader of which partition/group/whatever, so minimizing the number of
> > moving parts would be beneficial to me. I'd also like to point out that I
> > would be reluctant to implement the transaction API in the near future,
> but
> > would love to implement the idempotency API soon. The former seems only
> > relevant to real stream processing frameworks, which is probably not the
> > best use case for ruby-kafka.
> >
> > Cheers,
> > Daniel Schierbeck
> >
> > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io>
> wrote:
> >
> > > Hey Neha,
> > >
> > > Thanks for the thoughtful questions. I'll try to address the first
> > question
> > > since Apurva addressed the second. Since most readers are probably
> > getting
> > > up to speed with this large proposal, let me first take a step back and
> > > explain why we need the AppID at all. As Confluent tradition demands, I
> > > present you a big wall of text:
> > >
> > > Clearly "exactly once" delivery requires resilience to client failures.
> > > When a client crashes or turns into a zombie, another client must
> > > eventually be started to resume the work. There are two problems: 1) we
> > > need to ensure that the old process is actually dead or at least that
> it
> > > cannot write any more data, and 2) we need to be able to pick up
> wherever
> > > the last process left off. To do either of these, we need some kind of
> > > identifier to tie t

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-02 Thread Daniel Schierbeck
I don't have a lot of feedback on this, but at Zendesk we could definitely
use a standardized header system. Using ints as keys sounds tedious, but if
that's a necessary tradeoff I'd be okay with it.

On Fri, Dec 2, 2016 at 5:44 AM Todd Palino  wrote:

> Come on, I’ve done at least 2 talks on this one :)
>
> Producing counts to a topic is part of it, but that’s only part. So you
> count you have 100 messages in topic A. When you mirror topic A to another
> cluster, you have 99 messages. Where was your problem? Or worse, you have
> 100 messages, but one producer duplicated messages and another one lost
> messages. You need details about where the message came from in order to
> pinpoint problems when they happen. Source producer info, where it was
> produced into your infrastructure, and when it was produced. This requires
> you to add the information to the message.
>
> And yes, you still need to maintain your clients. So maybe my original
> example was not the best. My thoughts on not wanting to be responsible for
> message formats stands, because that’s very much separate from the client.
> As you know, we have our own internal client library that can insert the
> right headers, and right now inserts the right audit information into the
> message fields. If they exist, and assuming the message is Avro encoded.
> What if someone wants to use JSON instead for a good reason? What if user X
> wants to encrypt messages, but user Y does not? Maintaining the client
> library is still much easier than maintaining the message formats.
>
>
> -Todd
>
>
>
> On Thu, Dec 1, 2016 at 6:21 PM, Gwen Shapira  wrote:
>
> > Based on your last sentence, consider me convinced :)
> >
> > I get why headers are critical for Mirroring (you need tags to prevent
> > loops and sometimes to route messages to the correct destination).
> > But why do you need headers to audit? We are auditing by producing
> > counts to a side topic (and I was under the impression you do the
> > same), so we never need to modify the message.
> >
> > Another thing - after we added headers, wouldn't you be in the
> > business of making sure everyone uses them properly? Making sure
> > everyone includes the right headers you need, not using the header
> > names you intend to use, etc. I don't think the "policing" business
> > will ever go away.
> >
> > On Thu, Dec 1, 2016 at 5:25 PM, Todd Palino  wrote:
> > > Got it. As an ops guy, I'm not very happy with the workaround. Avro
> means
> > > that I have to be concerned with the format of the messages in order to
> > run
> > > the infrastructure (audit, mirroring, etc.). That means that I have to
> > > handle the schemas, and I have to enforce rules about good formats.
> This
> > is
> > > not something I want to be in the business of, because I should be able
> > to
> > > run a service infrastructure without needing to be in the weeds of
> > dealing
> > > with customer data formats.
> > >
> > > Trust me, a sizable portion of my support time is spent dealing with
> > schema
> > > issues. I really would like to get away from that. Maybe I'd have more
> > time
> > > for other hobbies. Like writing. ;)
> > >
> > > -Todd
> > >
> > > On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira  wrote:
> > >
> > >> I'm pretty satisfied with the current workarounds (Avro container
> > >> format), so I'm not too excited about the extra work required to do
> > >> headers in Kafka. I absolutely don't mind it if you do it...
> > >> I think the Apache convention for "good idea, but not willing to put
> > >> any work toward it" is +0.5? anyway, that's what I was trying to
> > >> convey :)
> > >>
> > >> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino 
> wrote:
> > >> > Well I guess my question for you, then, is what is holding you back
> > from
> > >> > full support for headers? What’s the bit that you’re missing that
> has
> > you
> > >> > under a full +1?
> > >> >
> > >> > -Todd
> > >> >
> > >> >
> > >> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira 
> > wrote:
> > >> >
> > >> >> I know why people who support headers support them, and I've seen
> > what
> > >> >> the discussion is like.
> > >> >>
> > >> >> This is why I'm asking people who are against headers (especially
> > >> >> committers) what will make them change their mind - so we can get
> > this
> > >> >> part over one way or another.
> > >> >>
> > >> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> > >> >> just looking for something concrete we can do to move the
> discussion
> > >> >> along to the yummy design details (which is the argument I really
> am
> > >> >> looking forward to).
> > >> >>
> > >> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino 
> > wrote:
> > >> >> > So, Gwen, to your question (even though I’m not a committer)...
> > >> >> >
> > >> >> > I have always been a strong supporter of introducing the concept
> > of 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Daniel Schierbeck
Hi there,

I'm the author of ruby-kafka, and as such am slightly biased towards
simplicity of implementation :-)

I like the proposal, and would love to use idempotent producer semantics in
our projects at Zendesk, but I'm a bit worried about the complexity that
would go into the clients; specifically: it sounds to me that in order to
get idempotent producer semantics, I'd have to implement the transaction
coordinator discovery. I may be wrong, but it would seem that it's not
strictly necessary if you're not using transactions – we could just use the
topic partition's leader as the coordinator, avoiding the extra discovery.
In my experience, most bugs are related to figuring out which broker is the
leader of which partition/group/whatever, so minimizing the number of
moving parts would be beneficial to me. I'd also like to point out that I
would be reluctant to implement the transaction API in the near future, but
would love to implement the idempotency API soon. The former seems only
relevant to real stream processing frameworks, which is probably not the
best use case for ruby-kafka.

Cheers,
Daniel Schierbeck

On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io> wrote:

> Hey Neha,
>
> Thanks for the thoughtful questions. I'll try to address the first question
> since Apurva addressed the second. Since most readers are probably getting
> up to speed with this large proposal, let me first take a step back and
> explain why we need the AppID at all. As Confluent tradition demands, I
> present you a big wall of text:
>
> Clearly "exactly once" delivery requires resilience to client failures.
> When a client crashes or turns into a zombie, another client must
> eventually be started to resume the work. There are two problems: 1) we
> need to ensure that the old process is actually dead or at least that it
> cannot write any more data, and 2) we need to be able to pick up wherever
> the last process left off. To do either of these, we need some kind of
> identifier to tie the two instances together.
>
> There are only two choices for where this ID comes from: either the user
> gives it to us or the server generates it. In the latter case, the user is
> responsible for fetching it from the client and persisting it somewhere for
> use after failure. We ultimately felt that the most flexible option is to
> have the user give it to us. In many applications, there is already a
> natural identifier which is already used to divide the workload. For
> example, in Kafka Streams and Kafka Connect, we have a taskId. For
> applications where there is no natural ID, the user can generate a UUID and
> persist it locally, which is as good as having the server generate it.
>
> So the AppID is used to provide continuity between the instances of a
> producer which are handling a certain workload. One of the early design
> decisions we made in this work was to make the delivery guarantees we
> provide agnostic of the workload that the producer is assigned. The
> producer is not in the business of trying to divide up the work among all
> its peers who are participating in the same duty (unlike the consumer, we
> don't know anything about where the data comes from). This has huge
> implications for "exactly-once" delivery because it puts the burden on the
> user to divide the total workload among producer instances and to assign
> AppIDs accordingly.
>
> I've been using the term "workload" loosely, but we usually imagine
> something like Kafka Connect's notion of a "source partition." A source
> partition could be a topic partition if the source is Kafka, or it could be
> a database table, a log file, or whatever makes sense for the source of the
> data. The point is that it's an independent source of data which can be
> assigned to a producer instance.
>
> If the same source partition is always assigned to the producer with the
> the same AppID, then Kafka transactions will give you "exactly once"
> delivery without much additional work. On initialization, the producer will
> ensure that 1) any previous producers using that AppID are "fenced" off,
> and 2) that any transaction which had been started by a previous producer
> with that AppID have either completed or aborted.
>
> Based on this, it should be clear that the ideal is to divide the workload
> so that you have a one-to-one mapping from the source partition to the
> AppID. If the source of the data is Kafka, then the source partition is
> just a topic partition, and the AppID can be generated from the name of the
> topic and the partition number.
>
> To finally get back to your auto-scaling question, let's assume for a
> moment the ideal mapping of source partition to AppID. The main question is
> whether

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-28 Thread Daniel Schierbeck
Jiangjie: I think giving users the possibility of defining a custom policy
for handling rejections is a good idea. For instance, this will allow Kafka
to act as an event store in an Event Sourcing application. If the event(s)
are rejected by the store, the original command may need to be re-validated
against the new state.

On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
wrote:

 @Ewen, good point about batching. Yes, it would be tricky if we want to do
 a per-key conditional produce. My understanding is that the prerequisite of
 this KIP is:
 1. Single producer for each partition.
 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

 The major problem it tries to solve is exact once produce, i.e. solve the
 duplicates from producer side. In that case, a batch will be considered as
 atomic. The only possibility of a batch got rejected should be it is
 already appended. So the producer should just move on.

 It looks to me even a transient multiple producer scenario will cause issue
 because user need to think about what should do if a request got rejected
 and the answer varies for different use cases.

 Thanks,

 Jiangjie (Becket) Qin

 On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:

  So I had another look at the 'Idempotent Producer' proposal this
  afternoon, and made a few notes on how I think they compare; if I've
  made any mistakes, I'd be delighted if someone with more context on
  the idempotent producer design would correct me.
 
  As a first intuition, you can think of the 'conditional publish'
  proposal as the special case of the 'idempotent producer' idea, where
  there's just a single producer per-partition. The key observation here
  is: if there's only one producer, you can conflate the 'sequence
  number' and the expected offset. The conditional publish proposal uses
  existing Kafka offset APIs for roughly the same things as the
  idempotent producer proposal uses sequence numbers for -- eg. instead
  of having a lease PID API that returns the current sequence number,
  we can use the existing 'offset API' to retrieve the upcoming offset.
 
  Both proposals attempt to deal with the situation where there are
  transiently multiple publishers for the same partition (and PID). The
  idempotent producer setup tracks a generation id for each pid, and
  discards any writes with a generation id smaller than the latest
  value. Conditional publish is 'first write wins' -- and instead of
  dropping duplicates on the server, it returns an error to the client.
  The duplicate-handling behaviour (dropping vs. erroring) has some
  interesting consequences:
 
  - If all producers are producing the same stream of messages, silently
  dropping duplicates on the server is more convenient. (Suppose we have
  a batch of messages 0-9, and the high-water mark on the server is 7.
  Idempotent producer, as I read it, would append 7-9 to the partition
  and return success; meanwhile, conditional publish would fail the
  entire batch.)
 
  - If producers might be writing different streams of messages, the
  proposed behaviour of the idempotent producer is probably worse --
  since it can silently interleave messages from two different
  producers. This can be a problem for some commit-log style use-cases,
  since it can transform a valid series of operations into an invalid
  one.
 
  - Given the error-on-duplicate behaviour, it's possible to implement
  deduplication on the client. (Sketch: if a publish returns an error
  for some partition, fetch the upcoming offset / sequence number for
  that partition, and discard all messages with a smaller offset on the
  client before republishing.)
 
  I think this makes the erroring behaviour more general, though
  deduplicating saves a roundtrip or two at conflict time.
 
  I'm less clear about the behaviour of the generation id, or what
  happens when (say) two producers with the same generation id are spun
  up at the same time. I'd be interested in hearing other folks'
  comments on this.
 
  Ewen: I'm not sure I understand the questions well enough to answer
  properly, but some quick notes:
  - I don't think it makes sense to assign an expected offset without
  already having assigned a partition. If the producer code is doing the
  partition assignment, it should probably do the offset assignment
  too... or we could just let application code handle both.
  - I'm not aware of any case where reassigning offsets to messages
  automatically after an offset mismatch makes sense: in the cases we've
  discussed, it seems like either it's safe to drop duplicates, or we
  want to handle the error at the application level.
 
  I'm going to try and come with an idempotent-producer-type example
  that works with the draft patch in the next few days, so hopefully
  we'll have something more concrete to discuss. Otherwise -- if you
  have a clear idea of how eg. sequence number assignment would work in
  the 

[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-21 Thread Daniel Schierbeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635281#comment-14635281
 ] 

Daniel Schierbeck commented on KAFKA-2260:
--

Where is the KIP being discussed? I couldn't find any mention of this in the 
dev list archive.

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-06-15 Thread Daniel Schierbeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585601#comment-14585601
 ] 

Daniel Schierbeck commented on KAFKA-2260:
--

On the mailing list we discussed a way to decrease contention due to these sort 
of writes by making the write conditional match the last offset of the 
message's _key_ rather than the entire partition. This way, a write will only 
be rejected if another producer has written a message with the same key. This 
will also allow the producer to get better feedback when using Kafka as a 
storage backend for Event Source style events, where a rejected write may 
require re-evaluating the original command with the updated context, e.g. 
change ticket title may be invalid now that the event ticket closed has 
been written. Maybe the user should be notified synchronously and allowed to 
take action.

The minimum requirement for implementing per-key conditional writes is that the 
broker must maintain an in-memory table mapping message keys to their highest 
offsets. The table can be saved to disk from time to time in order to cut down 
the time needed to rebuild it when recovering from a crash.

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor

 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Optimistic locking

2015-01-05 Thread Daniel Schierbeck
Regarding the use case – some events may only be valid given a specific
state of the world, e.g. a NewComment event and a PostClosedForComments
event would be valid only in that order. If two producer processes (e.g.
two HTTP application processes) tries to write an event each, you may get
integrity issues depending on the order. In this case, knowing that you
*must* build upon that latest event for a key will provide application
authors a way to ensure integrity.

I'm not quite sure I understand the challenge in the implementation – how
is key-based message expiration implemented, if not with the some mapping
of message keys to the last written offset? How else would you know whether
the message you're examining is the last or not?

On Mon Jan 05 2015 at 1:41:39 PM Colin Clark co...@clark.ws wrote:

 Hi,

 Couple of comments on this.

 What you're proposing is difficult to do at scale and would require some
 type of Paxos style algorithm for the update only if different - it would
 be easier in that case to just go ahead and do the update.

 Also, it seems like a conflation of concerns - in an event sourcing model,
 we save the immutable event, and represent current state in another,
 separate data structure.  Perhaps cassandra would work well here - that
 data model night provide what you're looking for out of the box.

 Just as I don't recommend people use data stores as queuing mechanisms, I
 also recommend not using a queuing mechanism as a primary datastore - mixed
 semantics.

 --
 *Colin*
 +1 612 859-6129


 On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

  I'm trying to design a system that uses Kafka as its primary data store
 by
  persisting immutable events into a topic and keeping a secondary index in
  another data store. The secondary index would store the entities. Each
  event would pertain to some entity, e.g. a user, and those entities are
  stored in an easily queriable way.
 
  Kafka seems well suited for this, but there's one thing I'm having
 problems
  with. I cannot guarantee that only one process writes events about an
  entity, which makes the design vulnerable to integrity issues.
 
  For example, say that a user can have multiple email addresses assigned,
  and the EmailAddressRemoved event is published when the user removes one.
  There's an integrity constraint, though: every user MUST have at least
 one
  email address. As far as I can see, there's no way to stop two separate
  processes from looking up a user entity, seeing that there are two email
  addresses assigned, and each publish an event. The end result would
 violate
  the contraint.
 
  If I'm wrong in saying that this isn't possible I'd love some feedback!
 
  My current thinking is that Kafka could relatively easily support this
 kind
  of application with a small additional API. Kafka already has the
 abstract
  notion of entities through its key-based retention policy. If the produce
  API was modified in order to allow an integer OffsetConstraint, the
  following algorithm could determine whether the request should proceed:
 
  1. For every key seen, keep track of the offset of the latest message
  referencing the key.
  2. When an OffsetContraint is specified in the produce API call, compare
  that value with the latest offset for the message key.
  2.1. If they're identical, allow the operation to continue.
  2.2. If they're not identical, fail with some OptimisticLockingFailure.
 
  Would such a feature be completely out of scope for Kafka?
 
  Best regards,
  Daniel Schierbeck
 



Re: Optimistic locking

2015-01-05 Thread Daniel Schierbeck
Reading your reply again, I'd like to address this:

 Also, it seems like a conflation of concerns - in an event sourcing
model, we save the immutable event, and represent current state in
another, separate
data structure.

That's the idea – events are stored in Kafka and processed sequentially by
a single process per partition. Those processes update a secondary store
(e.g. MySQL) with the current state. However, when determining whether an
event is valid, the current state must be taken into account. E.g. you
can only withdraw money from an open account, so the event sequence
AccountClosed - MoneyWithdrawn is invalid. The only way I can think of to
ensure this is the case is to have optimistic locking. Each account would
have a unique key, and in order to write an event to Kafka the secondary
store *must* be up-to-date with the previous events for that key. If not,
it should wait a bit and try again, re-validating the input against the new
state of the world. Is that a completely hopeless idea?

On Mon Jan 05 2015 at 2:18:49 PM Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:

 Regarding the use case – some events may only be valid given a specific
 state of the world, e.g. a NewComment event and a PostClosedForComments
 event would be valid only in that order. If two producer processes (e.g.
 two HTTP application processes) tries to write an event each, you may get
 integrity issues depending on the order. In this case, knowing that you
 *must* build upon that latest event for a key will provide application
 authors a way to ensure integrity.

 I'm not quite sure I understand the challenge in the implementation – how
 is key-based message expiration implemented, if not with the some mapping
 of message keys to the last written offset? How else would you know whether
 the message you're examining is the last or not?

 On Mon Jan 05 2015 at 1:41:39 PM Colin Clark co...@clark.ws wrote:

 Hi,

 Couple of comments on this.

 What you're proposing is difficult to do at scale and would require some
 type of Paxos style algorithm for the update only if different - it would
 be easier in that case to just go ahead and do the update.

 Also, it seems like a conflation of concerns - in an event sourcing model,
 we save the immutable event, and represent current state in another,
 separate data structure.  Perhaps cassandra would work well here - that
 data model night provide what you're looking for out of the box.

 Just as I don't recommend people use data stores as queuing mechanisms, I
 also recommend not using a queuing mechanism as a primary datastore -
 mixed
 semantics.

 --
 *Colin*
 +1 612 859-6129


 On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

  I'm trying to design a system that uses Kafka as its primary data store
 by
  persisting immutable events into a topic and keeping a secondary index
 in
  another data store. The secondary index would store the entities. Each
  event would pertain to some entity, e.g. a user, and those entities
 are
  stored in an easily queriable way.
 
  Kafka seems well suited for this, but there's one thing I'm having
 problems
  with. I cannot guarantee that only one process writes events about an
  entity, which makes the design vulnerable to integrity issues.
 
  For example, say that a user can have multiple email addresses assigned,
  and the EmailAddressRemoved event is published when the user removes
 one.
  There's an integrity constraint, though: every user MUST have at least
 one
  email address. As far as I can see, there's no way to stop two separate
  processes from looking up a user entity, seeing that there are two email
  addresses assigned, and each publish an event. The end result would
 violate
  the contraint.
 
  If I'm wrong in saying that this isn't possible I'd love some feedback!
 
  My current thinking is that Kafka could relatively easily support this
 kind
  of application with a small additional API. Kafka already has the
 abstract
  notion of entities through its key-based retention policy. If the
 produce
  API was modified in order to allow an integer OffsetConstraint, the
  following algorithm could determine whether the request should proceed:
 
  1. For every key seen, keep track of the offset of the latest message
  referencing the key.
  2. When an OffsetContraint is specified in the produce API call, compare
  that value with the latest offset for the message key.
  2.1. If they're identical, allow the operation to continue.
  2.2. If they're not identical, fail with some OptimisticLockingFailure.
 
  Would such a feature be completely out of scope for Kafka?
 
  Best regards,
  Daniel Schierbeck
 




Re: Optimistic locking

2015-01-05 Thread Daniel Schierbeck
Reading the source code, it seems that kafka.log.LogCleaner builds a
kafka.log.OffsetMap only when compacting logs, not ahead of time, so the
information is not available at write time :-/

If I'm not mistaken, this also means that the cleaner needs _two_ passes to
clean a log segment, one to build up the key-last_offset map and one to
actually remove expired messages. Having the map be available up front
could make that be a single-pass algorithm. Especially if the segments are
too large to keep in memory this should be considerable faster, at the cost
of having to maintain the mapping for each write.

I'd be willing to do the coding if you think it's a good idea.

On Mon Jan 05 2015 at 2:24:00 PM Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:

 Reading your reply again, I'd like to address this:

  Also, it seems like a conflation of concerns - in an event sourcing
 model, we save the immutable event, and represent current state in
 another, separate data structure.

 That's the idea – events are stored in Kafka and processed sequentially by
 a single process per partition. Those processes update a secondary store
 (e.g. MySQL) with the current state. However, when determining whether an
 event is valid, the current state must be taken into account. E.g. you
 can only withdraw money from an open account, so the event sequence
 AccountClosed - MoneyWithdrawn is invalid. The only way I can think of to
 ensure this is the case is to have optimistic locking. Each account would
 have a unique key, and in order to write an event to Kafka the secondary
 store *must* be up-to-date with the previous events for that key. If not,
 it should wait a bit and try again, re-validating the input against the new
 state of the world. Is that a completely hopeless idea?

 On Mon Jan 05 2015 at 2:18:49 PM Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

 Regarding the use case – some events may only be valid given a specific
 state of the world, e.g. a NewComment event and a PostClosedForComments
 event would be valid only in that order. If two producer processes (e.g.
 two HTTP application processes) tries to write an event each, you may get
 integrity issues depending on the order. In this case, knowing that you
 *must* build upon that latest event for a key will provide application
 authors a way to ensure integrity.

 I'm not quite sure I understand the challenge in the implementation – how
 is key-based message expiration implemented, if not with the some mapping
 of message keys to the last written offset? How else would you know whether
 the message you're examining is the last or not?

 On Mon Jan 05 2015 at 1:41:39 PM Colin Clark co...@clark.ws wrote:

 Hi,

 Couple of comments on this.

 What you're proposing is difficult to do at scale and would require some
 type of Paxos style algorithm for the update only if different - it would
 be easier in that case to just go ahead and do the update.

 Also, it seems like a conflation of concerns - in an event sourcing
 model,
 we save the immutable event, and represent current state in another,
 separate data structure.  Perhaps cassandra would work well here - that
 data model night provide what you're looking for out of the box.

 Just as I don't recommend people use data stores as queuing mechanisms, I
 also recommend not using a queuing mechanism as a primary datastore -
 mixed
 semantics.

 --
 *Colin*
 +1 612 859-6129


 On Mon, Jan 5, 2015 at 4:47 AM, Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

  I'm trying to design a system that uses Kafka as its primary data
 store by
  persisting immutable events into a topic and keeping a secondary index
 in
  another data store. The secondary index would store the entities.
 Each
  event would pertain to some entity, e.g. a user, and those entities
 are
  stored in an easily queriable way.
 
  Kafka seems well suited for this, but there's one thing I'm having
 problems
  with. I cannot guarantee that only one process writes events about an
  entity, which makes the design vulnerable to integrity issues.
 
  For example, say that a user can have multiple email addresses
 assigned,
  and the EmailAddressRemoved event is published when the user removes
 one.
  There's an integrity constraint, though: every user MUST have at least
 one
  email address. As far as I can see, there's no way to stop two separate
  processes from looking up a user entity, seeing that there are two
 email
  addresses assigned, and each publish an event. The end result would
 violate
  the contraint.
 
  If I'm wrong in saying that this isn't possible I'd love some feedback!
 
  My current thinking is that Kafka could relatively easily support this
 kind
  of application with a small additional API. Kafka already has the
 abstract
  notion of entities through its key-based retention policy. If the
 produce
  API was modified in order to allow an integer OffsetConstraint, the
  following algorithm could determine whether

[jira] [Created] (KAFKA-1827) Optimistic Locking when Producing Messages

2014-12-22 Thread Daniel Schierbeck (JIRA)
Daniel Schierbeck created KAFKA-1827:


 Summary: Optimistic Locking when Producing Messages
 Key: KAFKA-1827
 URL: https://issues.apache.org/jira/browse/KAFKA-1827
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Schierbeck


(I wasn't able to post to the ML, so I'm adding an issue instead. I hope that's 
okay.)

I'm trying to design a system that uses Kafka as its primary data store by 
persisting immutable events into a topic and keeping a secondary index in 
another data store. The secondary index would store the entities. Each event 
would pertain to some entity, e.g. a user, and those entities are stored in 
an easily queriable way.

Kafka seems well suited for this, but there's one thing I'm having problems 
with. I cannot guarantee that only one process writes events about an entity, 
which makes the design vulnerable to integrity issues.

For example, say that a user can have multiple email addresses assigned, and 
the EmailAddressRemoved event is published when the user removes one. There's 
an integrity constraint, though: every user MUST have at least one email 
address. As far as I can see, there's no way to stop two separate processes 
from looking up a user entity, seeing that there are two email addresses 
assigned, and each publish an event. The end result would violate the contraint.

If I'm wrong in saying that this isn't possible I'd love some feedback!

My current thinking is that Kafka could relatively easily support this kind of 
application with a small additional API. Kafka already has the abstract notion 
of entities through its key-based retention policy. If the produce API was 
modified in order to allow an integer OffsetConstraint, the following algorithm 
could determine whether the request should proceed:

1. For every key seen, keep track of the offset of the latest message 
referencing the key.
2. When an OffsetContraint is specified in the produce API call, compare that 
value with the latest offset for the message key.
2.1. If they're identical, allow the operation to continue.
2.2. If they're not identical, fail with some OptimisticLockingFailure.

Would such a feature be completely out of scope for Kafka?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Optimistic locking

2014-12-17 Thread Daniel Schierbeck
I'm trying to design a system that uses Kafka as its primary data store by
persisting immutable events into a topic and keeping a secondary index in
another data store. The secondary index would store the entities. Each
event would pertain to some entity, e.g. a user, and those entities are
stored in an easily queriable way.

Kafka seems well suited for this, but there's one thing I'm having problems
with. I cannot guarantee that only one process writes events about an
entity, which makes the design vulnerable to integrity issues.

For example, say that a user can have multiple email addresses assigned,
and the EmailAddressRemoved event is published when the user removes one.
There's an integrity constraint, though: every user MUST have at least one
email address. As far as I can see, there's no way to stop two separate
processes from looking up a user entity, seeing that there are two email
addresses assigned, and each publish an event. The end result would violate
the contraint.

If I'm wrong in saying that this isn't possible I'd love some feedback!

My current thinking is that Kafka could relatively easily support this kind
of application with a small additional API. Kafka already has the abstract
notion of entities through its key-based retention policy. If the produce
API was modified in order to allow an integer OffsetConstraint, the
following algorithm could determine whether the request should proceed:

1. For every key seen, keep track of the offset of the latest message
referencing the key.
2. When an OffsetContraint is specified in the produce API call, compare
that value with the latest offset for the message key.
2.1. If they're identical, allow the operation to continue.
2.2. If they're not identical, fail with some OptimisticLockingFailure.

Would such a feature be completely out of scope for Kafka?

Best regards,
Daniel Schierbeck


Optimistic locking

2014-12-16 Thread Daniel Schierbeck
I'm trying to design a system that uses Kafka as its primary data store by
persisting immutable events into a topic and keeping a secondary index in
another data store. The secondary index would store the entities. Each
event would pertain to some entity, e.g. a user, and those entities are
stored in an easily queriable way.

Kafka seems well suited for this, but there's one thing I'm having problems
with. I cannot guarantee that only one process writes events about an
entity, which makes the design vulnerable to integrity issues.

For example, say that a user can have multiple email addresses assigned,
and the EmailAddressRemoved event is published when the user removes one.
There's an integrity constraint, though: every user MUST have at least one
email address. As far as I can see, there's no way to stop two separate
processes from looking up a user entity, seeing that there are two email
addresses assigned, and each publish an event. The end result would violate
the contraint.

If I'm wrong in saying that this isn't possible I'd love some feedback!

My current thinking is that Kafka could relatively easily support this kind
of application with a small additional API. Kafka already has the abstract
notion of entities through its key-based retention policy. If the produce
API was modified in order to allow an integer OffsetConstraint, the
following algorithm could determine whether the request should proceed:

1. For every key seen, keep track of the offset of the latest message
referencing the key.
2. When an OffsetContraint is specified in the produce API call, compare
that value with the latest offset for the message key.
2.1. If they're identical, allow the operation to continue.
2.2. If they're not identical, fail with some OptimisticLockingFailure.

Would such a feature be completely out of scope for Kafka?

Best regards,
Daniel Schierbeck