Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-13 Thread Ben Kirwin
Hi Apurva,

Thanks for the detailed answers... and sorry for the late reply!

It does sound like, if the input-partitions-to-app-id mapping never
changes, the existing fencing mechanisms should prevent duplicates. Great!
I'm a bit concerned the proposed API will be delicate to program against
successfully -- even in the simple case, we need to create a new producer
instance per input partition, and anything fancier is going to need its own
implementation of the Streams/Samza-style 'task' idea -- but that may be
fine for this sort of advanced feature.

For the second question, I notice that Jason also elaborated on this
downthread:

> We also looked at removing the producer ID.
> This was discussed somewhere above, but basically the idea is to store the
> AppID in the message set header directly and avoid the mapping to producer
> ID altogether. As long as batching isn't too bad, the impact on total size
> may not be too bad, but we were ultimately more comfortable with a fixed
> size ID.

...which suggests that the distinction is useful for performance, but not
necessary for correctness, which makes good sense to me. (Would a 128-bid
ID be a reasonable compromise? That's enough room for a UUID, or a
reasonable hash of an arbitrary string, and has only a marginal increase on
the message size.)

On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <apu...@confluent.io> wrote:

> Hi Ben,
>
> Now, on to your first question of how deal with consumer rebalances. The
> short answer is that the application needs to ensure that the the
> assignment of input partitions to appId is consistent across rebalances.
>
> For Kafka streams, they already ensure that the mapping of input partitions
> to task Id is invariant across rebalances by implementing a custom sticky
> assignor. Other non-streams apps can trivially have one producer per input
> partition and have the appId be the same as the partition number to achieve
> the same effect.
>
> With this precondition in place, we can maintain transactions across
> rebalances.
>
> Hope this answers your question.
>
> Thanks,
> Apurva
>
> On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <b...@kirw.in> wrote:
>
> > Thanks for this! I'm looking forward to going through the full proposal
> in
> > detail soon; a few early questions:
> >
> > First: what happens when a consumer rebalances in the middle of a
> > transaction? The full documentation suggests that such a transaction
> ought
> > to be rejected:
> >
> > > [...] if a rebalance has happened and this consumer
> > > instance becomes a zombie, even if this offset message is appended in
> the
> > > offset topic, the transaction will be rejected later on when it tries
> to
> > > commit the transaction via the EndTxnRequest.
> >
> > ...but it's unclear to me how we ensure that a transaction can't complete
> > if a rebalance has happened. (It's quite possible I'm missing something
> > obvious!)
> >
> > As a concrete example: suppose a process with PID 1 adds offsets for some
> > partition to a transaction; a consumer rebalance happens that assigns the
> > partition to a process with PID 2, which adds some offsets to its current
> > transaction; both processes try and commit. Allowing both commits would
> > cause the messages to be processed twice -- how is that avoided?
> >
> > Second: App IDs normally map to a single PID. It seems like one could do
> > away with the PID concept entirely, and just use App IDs in most places
> > that require a PID. This feels like it would be significantly simpler,
> > though it does increase the message size. Are there other reasons why the
> > App ID / PID split is necessary?
> >
> > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi all,
> > >
> > > I have just created KIP-98 to enhance Kafka with exactly once delivery
> > > semantics:
> > >
> > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> > >
> > > This KIP adds a transactional messaging mechanism along with an
> > idempotent
> > > producer implementation to make sure that 1) duplicated messages sent
> > from
> > > the same identified producer can be detected on the broker side, and
> 2) a
> > > group of messages sent within a transaction will atomically be either
> > > reflected and fetchable to consumers or not as a whole.
> > >
> > > The above wiki page provides a high-level view of the proposed changes
> as
> > > well as summarized guarantees. Initial draft of the detailed
> > implementation
> > > design is described in this Google doc:
> > >
> > > https://docs.google.com/document/d/11Jqy_
> GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > 0wSw9ra8
> > >
> > >
> > > We would love to hear your comments and suggestions.
> > >
> > > Thanks,
> > >
> > > -- Guozhang
> > >
> >
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-06 Thread Ben Kirwin
Thanks for this! I'm looking forward to going through the full proposal in
detail soon; a few early questions:

First: what happens when a consumer rebalances in the middle of a
transaction? The full documentation suggests that such a transaction ought
to be rejected:

> [...] if a rebalance has happened and this consumer
> instance becomes a zombie, even if this offset message is appended in the
> offset topic, the transaction will be rejected later on when it tries to
> commit the transaction via the EndTxnRequest.

...but it's unclear to me how we ensure that a transaction can't complete
if a rebalance has happened. (It's quite possible I'm missing something
obvious!)

As a concrete example: suppose a process with PID 1 adds offsets for some
partition to a transaction; a consumer rebalance happens that assigns the
partition to a process with PID 2, which adds some offsets to its current
transaction; both processes try and commit. Allowing both commits would
cause the messages to be processed twice -- how is that avoided?

Second: App IDs normally map to a single PID. It seems like one could do
away with the PID concept entirely, and just use App IDs in most places
that require a PID. This feels like it would be significantly simpler,
though it does increase the message size. Are there other reasons why the
App ID / PID split is necessary?

On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang  wrote:

> Hi all,
>
> I have just created KIP-98 to enhance Kafka with exactly once delivery
> semantics:
>
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>  98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
>
> This KIP adds a transactional messaging mechanism along with an idempotent
> producer implementation to make sure that 1) duplicated messages sent from
> the same identified producer can be detected on the broker side, and 2) a
> group of messages sent within a transaction will atomically be either
> reflected and fetchable to consumers or not as a whole.
>
> The above wiki page provides a high-level view of the proposed changes as
> well as summarized guarantees. Initial draft of the detailed implementation
> design is described in this Google doc:
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8
>
>
> We would love to hear your comments and suggestions.
>
> Thanks,
>
> -- Guozhang
>


[jira] [Commented] (KAFKA-3335) Kafka Connect hangs in shutdown hook

2016-06-02 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313267#comment-15313267
 ] 

Ben Kirwin commented on KAFKA-3335:
---

Excellent -- thanks for following up on this!

> Kafka Connect hangs in shutdown hook
> 
>
> Key: KAFKA-3335
> URL: https://issues.apache.org/jira/browse/KAFKA-3335
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>    Reporter: Ben Kirwin
> Fix For: 0.10.0.0
>
>
> The `Connect` class can run into issues during start, such as:
> {noformat}
> Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: 
> Could not look up partition metadata for offset backing store topic in 
> allotted period. This could indicate a connectivity issue, unavailable topic 
> partitions, or if this is your first use of the topic it may have taken too 
> long to create.
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130)
> at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85)
> at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108)
> at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)
> {noformat}
> This exception halts the startup process. It also triggers the shutdown 
> hook... which blocks waiting for the service to start up before calling stop. 
> This causes the process to hang forever.
> There's a few things that could be done here, but it would be nice to bound 
> the amount of time the process spends trying to exit gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2016-06-02 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313258#comment-15313258
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Hi! I haven't had time to push on this at all in the last few months, but I'm 
still interested as well.

My understanding was that the core team was very focussed on streams / the 
other features in the 0.10 release, but would be interested in looking at 
coordination proposals after that. (This / idempotent producer / etc.) Does 
anyone think working this up into a github PR seems useful, or is there another 
next best step? The existing KIP is already fairly well fleshed-out, I think: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Ben Kirwin
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3335) Kafka Connect hangs in shutdown hook

2016-03-04 Thread Ben Kirwin (JIRA)
Ben Kirwin created KAFKA-3335:
-

 Summary: Kafka Connect hangs in shutdown hook
 Key: KAFKA-3335
 URL: https://issues.apache.org/jira/browse/KAFKA-3335
 Project: Kafka
  Issue Type: Bug
Reporter: Ben Kirwin


The `Connect` class can run into issues during start, such as:

{noformat}
Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: 
Could not look up partition metadata for offset backing store topic in allotted 
period. This could indicate a connectivity issue, unavailable topic partitions, 
or if this is your first use of the topic it may have taken too long to create.
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)
{noformat}

This exception halts the startup process. It also triggers the shutdown hook... 
which blocks waiting for the service to start up before calling stop. This 
causes the process to hang forever.

There's a few things that could be done here, but it would be nice to bound the 
amount of time the process spends trying to exit gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-11 Thread Ben Kirwin
.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:
 
  So I had another look at the 'Idempotent Producer' proposal this
  afternoon, and made a few notes on how I think they compare; if
  I've
  made any mistakes, I'd be delighted if someone with more context on
  the idempotent producer design would correct me.
 
  As a first intuition, you can think of the 'conditional publish'
  proposal as the special case of the 'idempotent producer' idea,
  where
  there's just a single producer per-partition. The key observation
  here
  is: if there's only one producer, you can conflate the 'sequence
  number' and the expected offset. The conditional publish proposal
  uses
  existing Kafka offset APIs for roughly the same things as the
  idempotent producer proposal uses sequence numbers for -- eg.
  instead
  of having a lease PID API that returns the current sequence
  number,
  we can use the existing 'offset API' to retrieve the upcoming
  offset.
 
  Both proposals attempt to deal with the situation where there are
  transiently multiple publishers for the same partition (and PID).
  The
  idempotent producer setup tracks a generation id for each pid, and
  discards any writes with a generation id smaller than the latest
  value. Conditional publish is 'first write wins' -- and instead of
  dropping duplicates on the server, it returns an error to the
  client.
  The duplicate-handling behaviour (dropping vs. erroring) has some
  interesting consequences:
 
  - If all producers are producing the same stream of messages,
  silently
  dropping duplicates on the server is more convenient. (Suppose we
  have
  a batch of messages 0-9, and the high-water mark on the server is
  7.
  Idempotent producer, as I read it, would append 7-9 to the
  partition
  and return success; meanwhile, conditional publish would fail the
  entire batch.)
 
  - If producers might be writing different streams of messages, the
  proposed behaviour of the idempotent producer is probably worse --
  since it can silently interleave messages from two different
  producers. This can be a problem for some commit-log style
  use-cases,
  since it can transform a valid series of operations into an invalid
  one.
 
  - Given the error-on-duplicate behaviour, it's possible to
  implement
  deduplication on the client. (Sketch: if a publish returns an error
  for some partition, fetch the upcoming offset / sequence number for
  that partition, and discard all messages with a smaller offset on
  the
  client before republishing.)
 
  I think this makes the erroring behaviour more general, though
  deduplicating saves a roundtrip or two at conflict time.
 
  I'm less clear about the behaviour of the generation id, or what
  happens when (say) two producers with the same generation id are
  spun
  up at the same time. I'd be interested in hearing other folks'
  comments on this.
 
  Ewen: I'm not sure I understand the questions well enough to answer
  properly, but some quick notes:
  - I don't think it makes sense to assign an expected offset without
  already having assigned a partition. If the producer code is doing
  the
  partition assignment, it should probably do the offset assignment
  too... or we could just let application code handle both.
  - I'm not aware of any case where reassigning offsets to messages
  automatically after an offset mismatch makes sense: in the cases
  we've
  discussed, it seems like either it's safe to drop duplicates, or we
  want to handle the error at the application level.
 
  I'm going to try and come with an idempotent-producer-type example
  that works with the draft patch in the next few days, so hopefully
  we'll have something more concrete to discuss. Otherwise -- if you
  have a clear idea of how eg. sequence number assignment would work
  in
  the idempotent-producer proposal, we could probably translate that
  over to get the equivalent for the conditional publish API.
 
 
 
  On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
  e...@confluent.io wrote:
  @Becket - for compressed batches, I think this just works out
  given
  the
  KIP
  as described. Without the change you're referring to, it still
  only
  makes
  sense to batch messages with this KIP if all the expected offsets
  are
  sequential (else some messages are guaranteed to fail). I think
  that
  probably just works out, but raises an issue I brought up on the
  KIP
  call.
 
  Batching can be a bit weird with this proposal. If you try to
  write
  key A
  and key B, the second operation is dependent on the first. Which
  means
  to
  make an effective client for this, we need to keep track of
  per-partition
  offsets so we can set expected offsets properly. For example, if
  A
  was
  expected to publish at offset 10, then if B was published to the
  same
  partition, we need to make sure it's marked as expected offset 11
  (assuming
  no subpartition high water marks). We

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Ben Kirwin
This is a great summary of the commit log options. Some additional comments:

1. One way to handle a transient failure is to treat it the same way
as a conditional publish failure: recompute the post-value before
retrying. (I believe this is enough to make this approach work
correctly.)

2. You can think of 2 as an 'optimization' of 1: the election
mechanism is there to ensure that the conditional publish failures
happen very rarely. When there are no conflicts, the conditional
publish is essentially free.

I guess I think of the zombie master problem like this: given some
window of time where two nodes both think they are the master,
conditional publish is enough to ensure that only one of the two will
successfully publish. However, it's not enough to ensure that the
'new' master is the successful one. This might cause the leadership
transition to happen a bit later than it would otherwise, but it
doesn't seem to actually impact correctness.

On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote:
 A couple of thoughts on the commit log use case. Suppose that we want to
 maintain multiple replicas of a K/V store backed by a shared Kafka
 topic/partition as a commit log. There are two possible ways to use Kafka
 as a commit log.

 1. The first approach allows multiple producers to publish to Kafka. Each
 replica of the data store keeps reading from a Kafka topic/partition to
 refresh the replica's view. Every time a replica gets an update to a key
 from a client, it combines the update and the current value of the key in
 its view and generates a post-value. It then does a conditional publish to
 Kafka with the post-value. The update is successful if the conditional
 publish succeeds. Otherwise, the replica has to recompute the post-value
 (potentially after the replica's view is refreshed) and retry the
 conditional publish. A potential issue with this approach is when there is
 a transient failure during publishing to Kafka (e.g., the leader of the
 partition changes). When this happens, the conditional publish will get an
 error. The replica doesn't know whether the publish actually succeeded or
 not. If we just blindly retry, it may not give the correct behavior (e.g.,
 we could be applying +1 twice). So, not sure if conditional publish itself
 is enough for this approach.

 2. The second approach allows only a single producer to publish to Kafka.
 We somehow elect one of the replicas to be the master that handles all
 updates. Normally, we don't need conditional publish since there is a
 single producer. Conditional publish can potentially be used to deal with
 duplicates. If the master encounters the same transient failure as the
 above, it can get the latest offset from the Kafka topic/partition to see
 if the publish actually succeeded or not since it's the only producer. A
 potential issue here is to handle the zombie master problem: if the master
 has a soft failure and another master is elected, we need to prevent the
 old master from publishing new data to Kafka. So, for this approach to work
 properly, we need some kind of support of single writer in addition to
 conditional publish.


 Jiangjie,

 The issue with partial commit is the following. Say we have a batch of 10
 uncompressed messages sent to the leader. The followers only fetched the
 first 5 messages and then the leader dies. In this case, we only committed
 5 out of the 10 messages.

 Thanks,

 Jun


 On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

 Jiangjie: I think giving users the possibility of defining a custom policy
 for handling rejections is a good idea. For instance, this will allow Kafka
 to act as an event store in an Event Sourcing application. If the event(s)
 are rejected by the store, the original command may need to be re-validated
 against the new state.

 On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  @Ewen, good point about batching. Yes, it would be tricky if we want to
 do
  a per-key conditional produce. My understanding is that the prerequisite
 of
  this KIP is:
  1. Single producer for each partition.
  2. Acks=-1, max.in.flight.request.per.connection=1,
 retries=SOME_BIG_NUMBER
 
  The major problem it tries to solve is exact once produce, i.e. solve the
  duplicates from producer side. In that case, a batch will be considered
 as
  atomic. The only possibility of a batch got rejected should be it is
  already appended. So the producer should just move on.
 
  It looks to me even a transient multiple producer scenario will cause
 issue
  because user need to think about what should do if a request got rejected
  and the answer varies for different use cases.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:
 
   So I had another look at the 'Idempotent Producer' proposal this
   afternoon, and made a few notes on how I think they compare; if I've
   made any

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Ben Kirwin
Jiangjie -- it seems to me that, while there are cases where you want
conservative producer settings like you suggest, there are others
enabled by this KIP where pipelining and retries are not an issue.

As a toy example, I've adapted the producer performance test to behave
as an idempotent producer here:

https://github.com/bkirwi/kafka/blob/conditional-publish/tools/src/main/java/org/apache/kafka/clients/tools/IdempotentProducerPerformance.java

This appends the numbers 1-N in-order to some partition. If you've set
up the server to do the offset checks, it's possible to run multiple
instances of this producer without reordering messages or adding
duplicates. For this kind of application, pipelining turns out to be
safe... it might result in more failed messages if there's a conflict,
but it won't hurt correctness.

On Mon, Jul 27, 2015 at 7:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote:
 @Ewen, good point about batching. Yes, it would be tricky if we want to do
 a per-key conditional produce. My understanding is that the prerequisite of
 this KIP is:
 1. Single producer for each partition.
 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

 The major problem it tries to solve is exact once produce, i.e. solve the
 duplicates from producer side. In that case, a batch will be considered as
 atomic. The only possibility of a batch got rejected should be it is
 already appended. So the producer should just move on.

 It looks to me even a transient multiple producer scenario will cause issue
 because user need to think about what should do if a request got rejected
 and the answer varies for different use cases.

 Thanks,

 Jiangjie (Becket) Qin

 On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:

 So I had another look at the 'Idempotent Producer' proposal this
 afternoon, and made a few notes on how I think they compare; if I've
 made any mistakes, I'd be delighted if someone with more context on
 the idempotent producer design would correct me.

 As a first intuition, you can think of the 'conditional publish'
 proposal as the special case of the 'idempotent producer' idea, where
 there's just a single producer per-partition. The key observation here
 is: if there's only one producer, you can conflate the 'sequence
 number' and the expected offset. The conditional publish proposal uses
 existing Kafka offset APIs for roughly the same things as the
 idempotent producer proposal uses sequence numbers for -- eg. instead
 of having a lease PID API that returns the current sequence number,
 we can use the existing 'offset API' to retrieve the upcoming offset.

 Both proposals attempt to deal with the situation where there are
 transiently multiple publishers for the same partition (and PID). The
 idempotent producer setup tracks a generation id for each pid, and
 discards any writes with a generation id smaller than the latest
 value. Conditional publish is 'first write wins' -- and instead of
 dropping duplicates on the server, it returns an error to the client.
 The duplicate-handling behaviour (dropping vs. erroring) has some
 interesting consequences:

 - If all producers are producing the same stream of messages, silently
 dropping duplicates on the server is more convenient. (Suppose we have
 a batch of messages 0-9, and the high-water mark on the server is 7.
 Idempotent producer, as I read it, would append 7-9 to the partition
 and return success; meanwhile, conditional publish would fail the
 entire batch.)

 - If producers might be writing different streams of messages, the
 proposed behaviour of the idempotent producer is probably worse --
 since it can silently interleave messages from two different
 producers. This can be a problem for some commit-log style use-cases,
 since it can transform a valid series of operations into an invalid
 one.

 - Given the error-on-duplicate behaviour, it's possible to implement
 deduplication on the client. (Sketch: if a publish returns an error
 for some partition, fetch the upcoming offset / sequence number for
 that partition, and discard all messages with a smaller offset on the
 client before republishing.)

 I think this makes the erroring behaviour more general, though
 deduplicating saves a roundtrip or two at conflict time.

 I'm less clear about the behaviour of the generation id, or what
 happens when (say) two producers with the same generation id are spun
 up at the same time. I'd be interested in hearing other folks'
 comments on this.

 Ewen: I'm not sure I understand the questions well enough to answer
 properly, but some quick notes:
 - I don't think it makes sense to assign an expected offset without
 already having assigned a partition. If the producer code is doing the
 partition assignment, it should probably do the offset assignment
 too... or we could just let application code handle both.
 - I'm not aware of any case where reassigning offsets to messages
 automatically after an offset

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-26 Thread Ben Kirwin
 to B? Are there retries that somehow update the expected offset, or do we
 just give up since we know it's always going to fail with the expected
 offset that was automatically assigned to it?

 One way to handle this is to use Yasuhiro's idea of increasing the
 granularity of high watermarks using subpartitions. But I guess my question
 is: if one producer client is writing many keys, and some of those keys are
 produced to the same partition, and those messages are batched, what
 happens? Do we end up with lots of failed messages? Or do we have
 complicated logic in the producer to figure out what the right expected
 offset for each message is? Or do they all share the same base expected
 offset as in the compressed case, in which case they all share the same
 fate and subpartitioning doesn't help? Or is there a simpler solution I'm
 just not seeing? Maybe this just disables batching entirely and throughput
 isn't an issue in these cases?

 Sorry, I know that's probably not entirely clear, but that's because I'm
 very uncertain of how batching works with this KIP.


 On how this relates to other proposals: I think it might also be helpful to
 get an overview of all the proposals for relevant modifications to
 producers/produce requests since many of these proposals are possibly
 alternatives (though some may not be mutually exclusive). Many people don't
 have all the context from the past couple of years of the project. Are
 there any other relevant wikis or docs besides the following?

 https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
 https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

 -Ewen

 On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

 Tangent: I think we should complete the move of Produce / Fetch RPC to
 the client libraries before we add more revisions to this protocol.

 On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
 j...@linkedin.com.invalid wrote:
  I missed yesterday's KIP hangout. I'm currently working on another KIP
 for
  enriched metadata of messages. Guozhang has already created a wiki page
  before (
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
 ).
  We plan to fill the relative offset to the offset field in the batch sent
  by producer to avoid broker side re-compression. The message offset would
  become batch base offset + relative offset. I guess maybe the expected
  offset in KIP-27 can be only set for base offset? Would that affect
 certain
  use cases?
 
  For Jun's comments, I am not sure I completely get it. I think the
 producer
  only sends one batch per partition in a request. So either that batch is
  appended or not. Why a batch would be partially committed?
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote:
 
  That's a fair point. I've added some imagined job logic to the KIP, so
  we can make sure the proposal stays in sync with the usages we're
  discussing. (The logic is just a quick sketch for now -- I expect I'll
  need to elaborate it as we get into more detail, or to address other
  concerns...)
 
  On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote:
   For 1, yes, when there is a transient leader change, it's guaranteed
  that a
   prefix of the messages in a request will be committed. However, it
 seems
   that the client needs to know what subset of messages are committed in
   order to resume the sending. Then the question is how.
  
   As Flavio indicated, for the use cases that you listed, it would be
  useful
   to figure out the exact logic by using this feature. For example, in
 the
   partition K/V store example, when we fail over to a new writer to the
   commit log, the zombie writer can publish new messages to the log
 after
  the
   new writer takes over, but before it publishes any message. We
 probably
   need to outline how this case can be handled properly.
  
   Thanks,
  
   Jun
  
   On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:
  
   Hi Jun,
  
   Thanks for the close reading! Responses inline.
  
Thanks for the write-up. The single producer use case you mentioned
  makes
sense. It would be useful to include that in the KIP wiki.
  
   Great -- I'll make sure that the wiki is clear about this.
  
1. What happens when the leader of the partition changes in the
 middle
   of a
produce request? In this case, the producer client is not sure
 whether
   the
request succeeds or not. If there is only a single message in the
   request,
the producer can just resend the request. If it sees an
 OffsetMismatch
error, it knows that the previous send actually succeeded and can
  proceed
with the next write. This is nice since it not only allows the
  producer
   to
proceed during transient failures in the broker, it also avoids

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Ben Kirwin
Hi Jun,

Thanks for the close reading! Responses inline.

 Thanks for the write-up. The single producer use case you mentioned makes
 sense. It would be useful to include that in the KIP wiki.

Great -- I'll make sure that the wiki is clear about this.

 1. What happens when the leader of the partition changes in the middle of a
 produce request? In this case, the producer client is not sure whether the
 request succeeds or not. If there is only a single message in the request,
 the producer can just resend the request. If it sees an OffsetMismatch
 error, it knows that the previous send actually succeeded and can proceed
 with the next write. This is nice since it not only allows the producer to
 proceed during transient failures in the broker, it also avoids duplicates
 during producer resend. One caveat is when there are multiple messages in
 the same partition in a produce request. The issue is that in our current
 replication protocol, it's possible for some, but not all messages in the
 request to be committed. This makes resend a bit harder to deal with since
 on receiving an OffsetMismatch error, it's not clear which messages have
 been committed. One possibility is to expect that compression is enabled,
 in which case multiple messages are compressed into a single message. I was
 thinking that another possibility is for the broker to return the current
 high watermark when sending an OffsetMismatch error. Based on this info,
 the producer can resend the subset of messages that have not been
 committed. However, this may not work in a compacted topic since there can
 be holes in the offset.

This is a excellent question. It's my understanding that at least a
*prefix* of messages will be committed (right?) -- which seems to be
enough for many cases. I'll try and come up with a more concrete
answer here.

 2. Is this feature only intended to be used with ack = all? The client
 doesn't get the offset with ack = 0. With ack = 1, it's possible for a
 previously acked message to be lost during leader transition, which will
 make the client logic more complicated.

It's true that acks = 0 doesn't seem to be particularly useful; in all
the cases I've come across, the client eventually wants to know about
the mismatch error. However, it seems like there are some cases where
acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
losing messages during a leader transition just means you need to
rewind / restart the load, which is not especially catastrophic. For
many other interesting cases, acks = all is probably preferable.

 3. How does the producer client know the offset to send the first message?
 Do we need to expose an API in producer to get the current high watermark?

You're right, it might be irritating to have to go through the
consumer API just for this. There are some cases where the offsets are
already available -- like the commit-log-for-KV-store example -- but
in general, being able to get the offsets from the producer interface
does sound convenient.

 We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you
 can describe this KIP a bit then?

Sure, happy to join.

 Thanks,

 Jun



 On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:

 Just wanted to flag a little discussion that happened on the ticket:

 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259

 In particular, Yasuhiro Matsuda proposed an interesting variant on
 this that performs the offset check on the message key (instead of
 just the partition), with bounded space requirements, at the cost of
 potentially some spurious failures. (ie. the produce request may fail
 even if that particular key hasn't been updated recently.) This
 addresses a couple of the drawbacks of the per-key approach mentioned
 at the bottom of the KIP.

 On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
  Hi all,
 
  So, perhaps it's worth adding a couple specific examples of where this
  feature is useful, to make this a bit more concrete:
 
  - Suppose I'm using Kafka as a commit log for a partitioned KV store,
  like Samza or Pistachio (?) do. We bootstrap the process state by
  reading from that partition, and log all state updates to that
  partition when we're running. Now imagine that one of my processes
  locks up -- GC or similar -- and the system transitions that partition
  over to another node. When the GC is finished, the old 'owner' of that
  partition might still be trying to write to the commit log at the same
  as the new one is. A process might detect this by noticing that the
  offset of the published message is bigger than it thought the upcoming
  offset was, which implies someone else has been writing to the log...
  but by then it's too late, and the commit log is already corrupt. With
  a 'conditional produce', one of those processes will have it's publish
  request

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Ben Kirwin
Yes, sorry, I think this is right -- it's pretty application-specific.
One thing to note: in a large subset of cases (ie. bulk load,
copycat-type, mirrormaker) the correct response is not to resend the
message at all; if there's already a message at that offset, it's
because another instance of the same process already sent the exact
same message.

On Mon, Jul 20, 2015 at 4:38 PM, Flavio P JUNQUEIRA f...@apache.org wrote:
 Up to Ben to clarify, but I'd think that in this case, it is up to the
 logic of B to decide what to do. B knows that the offset isn't what it
 expects, so it can react accordingly. If it chooses to try again, then it
 should not violate any application invariant.

 -Flavio

 On Fri, Jul 17, 2015 at 9:49 PM, Ashish Singh asi...@cloudera.com wrote:

 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

  If we have 2 producers producing to a partition, they can be out of
 order,
  then how does one producer know what offset to expect as it does not
  interact with other producer?
 
  Can you give an example flow that explains how it works with single
  producer and with multiple producers?
 
 
  Thanks,
 
  Mayuresh
 
  On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
  fpjunque...@yahoo.com.invalid wrote:
 
   I like this feature, it reminds me of conditional updates in zookeeper.
   I'm not sure if it'd be best to have some mechanism for fencing rather
  than
   a conditional write like you're proposing. The reason I'm saying this
 is
   that the conditional write applies to requests individually, while it
   sounds like you want to make sure that there is a single client writing
  so
   over multiple requests.
  
   -Flavio
  
On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
   
Hi there,
   
I just added a KIP for a 'conditional publish' operation: a simple
CAS-like mechanism for the Kafka producer. The wiki page is here:
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
   
And there's some previous discussion on the ticket and the users
 list:
   
https://issues.apache.org/jira/browse/KAFKA-2260
   
   
  
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
   
As always, comments and suggestions are very welcome.
   
Thanks,
Ben
  
  
 
 
  --
  -Regards,
  Mayuresh R. Gharat
  (862) 250-7125
 



 --

 Regards,
 Ashish



[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-18 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632537#comment-14632537
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Ah, clever! Thanks for sharing this -- I've linked to it on the discussion 
thread.

{quote}
When changing the number of sub-partitions, the broker doesn't have to 
recompute sub-partition high water marks. It can initialize all array elements 
with the partition's high water mark.
{quote}

It seems we could also do this initialization every time the log is opened -- 
and avoid any persistent storage for the 'sub-partition' offsets at all. This 
would remove another major drawback of the per-key approach.

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-18 Thread Ben Kirwin
Just wanted to flag a little discussion that happened on the ticket:
https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259

In particular, Yasuhiro Matsuda proposed an interesting variant on
this that performs the offset check on the message key (instead of
just the partition), with bounded space requirements, at the cost of
potentially some spurious failures. (ie. the produce request may fail
even if that particular key hasn't been updated recently.) This
addresses a couple of the drawbacks of the per-key approach mentioned
at the bottom of the KIP.

On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
 Hi all,

 So, perhaps it's worth adding a couple specific examples of where this
 feature is useful, to make this a bit more concrete:

 - Suppose I'm using Kafka as a commit log for a partitioned KV store,
 like Samza or Pistachio (?) do. We bootstrap the process state by
 reading from that partition, and log all state updates to that
 partition when we're running. Now imagine that one of my processes
 locks up -- GC or similar -- and the system transitions that partition
 over to another node. When the GC is finished, the old 'owner' of that
 partition might still be trying to write to the commit log at the same
 as the new one is. A process might detect this by noticing that the
 offset of the published message is bigger than it thought the upcoming
 offset was, which implies someone else has been writing to the log...
 but by then it's too late, and the commit log is already corrupt. With
 a 'conditional produce', one of those processes will have it's publish
 request refused -- so we've avoided corrupting the state.

 - Envision some copycat-like system, where we have some sharded
 postgres setup and we're tailing each shard into its own partition.
 Normally, it's fairly easy to avoid duplicates here: we can track
 which offset in the WAL corresponds to which offset in Kafka, and we
 know how many messages we've written to Kafka already, so the state is
 very simple. However, it is possible that for a moment -- due to
 rebalancing or operator error or some other thing -- two different
 nodes are tailing the same postgres shard at once! Normally this would
 introduce duplicate messages, but by specifying the expected offset,
 we can avoid this.

 So perhaps it's better to say that this is useful when a single
 producer is *expected*, but multiple producers are *possible*? (In the
 same way that the high-level consumer normally has 1 consumer in a
 group reading from a partition, but there are small windows where more
 than one might be reading at the same time.) This is also the spirit
 of the 'runtime cost' comment -- in the common case, where there is
 little to no contention, there's no performance overhead either. I
 mentioned this a little in the Motivation section -- maybe I should
 flesh that out a little bit?

 For me, the motivation to work this up was that I kept running into
 cases, like the above, where the existing API was almost-but-not-quite
 enough to give the guarantees I was looking for -- and the extension
 needed to handle those cases too was pretty small and natural-feeling.

 On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote:
 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

 If we have 2 producers producing to a partition, they can be out of order,
 then how does one producer know what offset to expect as it does not
 interact with other producer?

 Can you give an example flow that explains how it works with single
 producer and with multiple producers?


 Thanks,

 Mayuresh

 On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
 fpjunque...@yahoo.com.invalid wrote:

  I like this feature, it reminds me of conditional updates in zookeeper.
  I'm not sure if it'd be best to have some mechanism for fencing rather
 than

[DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Ben Kirwin
Hi there,

I just added a KIP for a 'conditional publish' operation: a simple
CAS-like mechanism for the Kafka producer. The wiki page is here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

And there's some previous discussion on the ticket and the users list:

https://issues.apache.org/jira/browse/KAFKA-2260

https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E

As always, comments and suggestions are very welcome.

Thanks,
Ben


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-17 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14630870#comment-14630870
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Opened a KIP for this here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

[~becket_qin]: Thanks! To #1 and #3 -- this feature is aimed at the special 
(but fairly common) case where we only expect one producer for a single 
partition at a given time. More complicated situations definitely require more 
elaborate coordination -- and while it's possible that you might implement more 
elaborate coordination mechanisms on top of this, I've left that out of scope 
for now. To #2 -- there are definitely cases where key-based CAS feels handier, 
but the converse is also true, and tracking offsets for each key requires some 
auxiliary data with some commensurate overhead. I discussed this a little bit 
in the KIP -- though the point that 'partial failure' becomes more likely is 
not something I'd considered, and also a very good point.


 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-17 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632120#comment-14632120
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Interesting -- the idea is that we track the max offset per *hash* of the key, 
instead of the key itself? I guess that if you use an array of length 1, this 
reduces to the current proposal. :) It would be interesting to calculate how 
frequently different keys would conflict, given a good hash function.

It seems like, for this to work, you'd need to add an additional method to the 
API to get the current offset for the hash of a given key?

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-17 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632113#comment-14632113
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Ah, sorry! Let me try again.

Suppose you try and send a batch of messages to a partition, but get some 
network error -- it's possible that they were published successfully, or that 
they were lost before they made it to the broker. With the CAS, the simple 
thing to do is to resend the same batch with the same expected offsets. If the 
messages were published correctly last time, you'll get a check mismatch error; 
and if they weren't, they'll be appended correctly.

If the series of messages that the producer wants to send is fixed, the same 
mechanism would work even through producer restarts. If the set of messages 
isn't fixed -- the producer might have a completely different set of messages 
to send after restarting -- than what it means to be exactly-once becomes a lot 
more domain-dependent; you might want to write exactly one group of messages 
for each input message, or rpc request, or five-minute interval -- but that 
requires coordination between a bunch of different moving parts, and I don't 
think there's one coordination mechanism that handles all cases. (This 
'expected offset' thing is enough for some, but certainly not all of them...)

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Ben Kirwin
Hi all,

So, perhaps it's worth adding a couple specific examples of where this
feature is useful, to make this a bit more concrete:

- Suppose I'm using Kafka as a commit log for a partitioned KV store,
like Samza or Pistachio (?) do. We bootstrap the process state by
reading from that partition, and log all state updates to that
partition when we're running. Now imagine that one of my processes
locks up -- GC or similar -- and the system transitions that partition
over to another node. When the GC is finished, the old 'owner' of that
partition might still be trying to write to the commit log at the same
as the new one is. A process might detect this by noticing that the
offset of the published message is bigger than it thought the upcoming
offset was, which implies someone else has been writing to the log...
but by then it's too late, and the commit log is already corrupt. With
a 'conditional produce', one of those processes will have it's publish
request refused -- so we've avoided corrupting the state.

- Envision some copycat-like system, where we have some sharded
postgres setup and we're tailing each shard into its own partition.
Normally, it's fairly easy to avoid duplicates here: we can track
which offset in the WAL corresponds to which offset in Kafka, and we
know how many messages we've written to Kafka already, so the state is
very simple. However, it is possible that for a moment -- due to
rebalancing or operator error or some other thing -- two different
nodes are tailing the same postgres shard at once! Normally this would
introduce duplicate messages, but by specifying the expected offset,
we can avoid this.

So perhaps it's better to say that this is useful when a single
producer is *expected*, but multiple producers are *possible*? (In the
same way that the high-level consumer normally has 1 consumer in a
group reading from a partition, but there are small windows where more
than one might be reading at the same time.) This is also the spirit
of the 'runtime cost' comment -- in the common case, where there is
little to no contention, there's no performance overhead either. I
mentioned this a little in the Motivation section -- maybe I should
flesh that out a little bit?

For me, the motivation to work this up was that I kept running into
cases, like the above, where the existing API was almost-but-not-quite
enough to give the guarantees I was looking for -- and the extension
needed to handle those cases too was pretty small and natural-feeling.

On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote:
 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

 If we have 2 producers producing to a partition, they can be out of order,
 then how does one producer know what offset to expect as it does not
 interact with other producer?

 Can you give an example flow that explains how it works with single
 producer and with multiple producers?


 Thanks,

 Mayuresh

 On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
 fpjunque...@yahoo.com.invalid wrote:

  I like this feature, it reminds me of conditional updates in zookeeper.
  I'm not sure if it'd be best to have some mechanism for fencing rather
 than
  a conditional write like you're proposing. The reason I'm saying this is
  that the conditional write applies to requests individually, while it
  sounds like you want to make sure that there is a single client writing
 so
  over multiple requests.
 
  -Flavio
 
   On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
  
   Hi there,
  
   I just added a KIP for a 'conditional publish' operation: a simple
   CAS-like mechanism for the Kafka producer. The wiki page is here:
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
  
   And there's some previous discussion on the ticket and the users list:
  
   https://issues.apache.org/jira/browse/KAFKA-2260
  
  
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox

Confluence Wiki Permissions

2015-07-15 Thread Ben Kirwin
Hi there,

I'm interested in posting a KIP (for this ticket:
https://issues.apache.org/jira/browse/KAFKA-2260) but I don't seem to
have the right permissions to create the wiki page.

Can anyone here set that up? (Username is `bkirwi`.)


Re: Confluence Wiki Permissions

2015-07-15 Thread Ben Kirwin
Great -- thank you!

On Wed, Jul 15, 2015 at 9:13 PM, Jun Rao j...@confluent.io wrote:
 Done.

 Thanks,

 Jun

 On Wed, Jul 15, 2015 at 5:22 PM, Ben Kirwin b...@kirw.in wrote:

 Hi there,

 I'm interested in posting a KIP (for this ticket:
 https://issues.apache.org/jira/browse/KAFKA-2260) but I don't seem to
 have the right permissions to create the wiki page.

 Can anyone here set that up? (Username is `bkirwi`.)



[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-14 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627460#comment-14627460
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Hi [~ewencp] -- thanks for the interest! I'd be glad to work this up into a 
KIP, but it looks like I don't have the permissions to create a wiki page... 
could you set that up?

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-14 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627484#comment-14627484
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Will do; thanks!

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-13 Thread Ben Kirwin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Kirwin updated KAFKA-2260:
--
Status: Patch Available  (was: Open)

Worked up a draft of this over the weekend, implementing the 
cas-on-partition-offset feature outlined in the original post. This is enough 
to support many cases of 'idempotent producer', along with a bunch of other fun 
stuff.

I'm attaching the diff here -- if folks are interested in moving this forward, 
I'll post it to reviewboard as well?

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor

 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-13 Thread Ben Kirwin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Kirwin updated KAFKA-2260:
--
Attachment: expected-offsets.patch

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2260) Allow specifying expected offset on produce

2015-06-08 Thread Ben Kirwin (JIRA)
Ben Kirwin created KAFKA-2260:
-

 Summary: Allow specifying expected offset on produce
 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor


I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka 
producer. This update has a small footprint, but enables a bunch of interesting 
uses in stream processing or as a commit log for process state.

h4. Proposed Change

In short:

- Allow the user to attach a specific offset to each message produced.

- The server assigns offsets to messages in the usual way. However, if the 
expected offset doesn't match the actual offset, the server should fail the 
produce request instead of completing the write.

This is a form of optimistic concurrency control, like the ubiquitous 
check-and-set -- but instead of checking the current value of some state, it 
checks the current offset of the log.

h4. Motivation

Much like check-and-set, this feature is only useful when there's very low 
contention. Happily, when Kafka is used as a commit log or as a 
stream-processing transport, it's common to have just one producer (or a small 
number) for a given partition -- and in many of these cases, predicting offsets 
turns out to be quite useful.

- We get the same benefits as the 'idempotent producer' proposal: a producer 
can retry a write indefinitely and be sure that at most one of those attempts 
will succeed; and if two producers accidentally write to the end of the 
partition at once, we can be certain that at least one of them will fail.

- It's possible to 'bulk load' Kafka this way -- you can write a list of n 
messages consecutively to a partition, even if the list is much larger than the 
buffer size or the producer has to be restarted.

- If a process is using Kafka as a commit log -- reading from a partition to 
bootstrap, then writing any updates to that same partition -- it can be sure 
that it's seen all of the messages in that partition at the moment it does its 
first (successful) write.

There's a bunch of other similar use-cases here, but they all have roughly the 
same flavour.

h4. Implementation

The major advantage of this proposal over other suggested transaction / 
idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
currently-unused field, adds no new APIs, and requires very little new code or 
additional work from the server.

- Produced messages already carry an offset field, which is currently ignored 
by the server. This field could be used for the 'expected offset', with a sigil 
value for the current behaviour. (-1 is a natural choice, since it's already 
used to mean 'next available offset'.)

- We'd need a new error and error code for a 'CAS failure'.

- The server assigns offsets to produced messages in 
{{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, 
this method would assign offsets in the same way -- but if they don't match the 
offset in the message, we'd return an error instead of completing the write.

- To avoid breaking existing clients, this behaviour would need to live behind 
some config flag. (Possibly global, but probably more useful per-topic?)

I understand all this is unsolicited and possibly strange: happy to answer 
questions, and if this seems interesting, I'd be glad to flesh this out into a 
full KIP or patch. (And apologies if this is the wrong venue for this sort of 
thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)