Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-11 Thread Ben Kirwin
This is a very nice summary of the consistency / correctness issues
possible with a commit log.

 (assuming it’s publishing asynchronously and in an open loop)

It's perhaps already clear to folks here, but -- if you *don't* do that,
and instead only send one batch of messages at a time and check the result,
you don't have the interleaving issue. (Of course, that means you give up
pipelining batches...)
On Aug 10, 2015 2:46 PM, Flavio Junqueira f...@apache.org wrote:

 I've been trying to understand what is being proposed in this KIP and I've
 put down some notes with some feedback from Ben that I wanted to share for
 feedback. I'm not really following the flow of the thread, since I've read
 a few sources to get to this, and I apologize for that.

 Here is how I see it t a high level. There are really two problems being
 discussed in the context of this KIP:
 Single writer with failover:
 Consistent logs

 Single writer with failover
 The idea is that at any time there must be at most one publisher active.
 To get high availability, we can’t rely on a single process to be such a
 publisher and consequently we need the failover part: if the current active
 publisher crashes, then another publisher takes over and becomes active.
 One important issue with scenarios like this is that during transitions
 from one active publisher to another, there could be races and two
 publishers end up interleaving messages in a topic/partition/key.

 Why is this interleaving bad? This is really application specific, but one
 general way of seeing this is that only one process has the authoritative
 application state to generate messages to publish. Transitioning from an
 active publisher to another, typically requires recovering state or
 performing some kind of coordination. If no such recovery is required, then
 we are essentially in the multi-writer space. The commit log use case is a
 general one mentioned in the KIP description.

 Consistent logs
 Consistent logs might not be the best term here, but I’m using it to
 describe the need of having the messages in a topic/partition/key
 reflecting consistently the state of the application. For example, some
 applications might be OK with a published sequence:

 A B B C (e.g., value = 10)

 in the case the messages are idempotent operations, but others might
 really require:

 A B C (e.g., value += 10)

 if they aren’t idempotent operations. Order and gaps are also an issue, so
 some applications might be OK with:

 A C B (e.g., value += x)

 and skipping B altogether might be ok if B has no side-effects (e.g.,
 operation associated to B has failed).

 Putting things together
 The current KIP-27 proposal seems to do a good job with providing a
 consistent log in the absence of concurrency. It enables publishers to
 re-publish messages without duplication, which is one requirement for
 exactly-once semantics. Gaps need to be handled by the publisher. For
 example, if the publisher publishes A B C (assuming it’s publishing
 asynchronously and in an open loop), it could have A succeeding but not B
 and C. In this case, it needs to redo the publish of B and C. It could also
 have B failing and C succeeding, in which case the publisher repeats B and
 C.

 A really nice feature of the current proposal is that it is a simple
 primitive that enables the implementation of publishers with different
 delivery guarantees. It doesn’t seem to be well suited to the first problem
 of implementing a single writer with failover, however. It allows runs in
 which two producers interleave messages because the mechanism focuses on a
 single message. The single writer might not even care about duplicates and
 gaps depending on the application, but it might care that there aren’t two
 publishers interleaving messages in the Kafka log.

 A typical way of dealing with these cases is to use a token associated to
 a lease to fence off the other publishers. For example, to demote an active
 publisher, another publisher could invoke a demote call and have the ISR
 leader replace the token. The lease of the token could be done directly
 with ZooKeeper or via the ISR leader. The condition to publish a message or
 a batch could be a combination of token verification and offset check.

 -Flavio

  On 10 Aug 2015, at 00:15, Jun Rao j...@confluent.io wrote:
 
  Couple of other things.
 
  A. In the discussion, we talked about the usage of getting the latest
 high
  watermark from the broker. Currently, the high watermark in a partition
 can
  go back a bit for a short period of time during leader change. So, the
 high
  watermark returned in the getOffset api is not 100% accurate. There is a
  jira (KAFKA-2334) to track this issue.
 
  B. The proposal in the wiki is to put the expected offset in every
 message,
  even when the messages are compressed. With Jiangjie's proposal of
 relative
  offset, the expected offset probably can only be set at the shallow
  compressed message level. We will need to think 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-10 Thread Flavio Junqueira
I've been trying to understand what is being proposed in this KIP and I've put 
down some notes with some feedback from Ben that I wanted to share for 
feedback. I'm not really following the flow of the thread, since I've read a 
few sources to get to this, and I apologize for that.

Here is how I see it t a high level. There are really two problems being 
discussed in the context of this KIP:
Single writer with failover:
Consistent logs

Single writer with failover
The idea is that at any time there must be at most one publisher active. To get 
high availability, we can’t rely on a single process to be such a publisher and 
consequently we need the failover part: if the current active publisher 
crashes, then another publisher takes over and becomes active. One important 
issue with scenarios like this is that during transitions from one active 
publisher to another, there could be races and two publishers end up 
interleaving messages in a topic/partition/key.

Why is this interleaving bad? This is really application specific, but one 
general way of seeing this is that only one process has the authoritative 
application state to generate messages to publish. Transitioning from an active 
publisher to another, typically requires recovering state or performing some 
kind of coordination. If no such recovery is required, then we are essentially 
in the multi-writer space. The commit log use case is a general one mentioned 
in the KIP description.

Consistent logs
Consistent logs might not be the best term here, but I’m using it to describe 
the need of having the messages in a topic/partition/key reflecting 
consistently the state of the application. For example, some applications might 
be OK with a published sequence:

A B B C (e.g., value = 10) 

in the case the messages are idempotent operations, but others might really 
require:

A B C (e.g., value += 10)

if they aren’t idempotent operations. Order and gaps are also an issue, so some 
applications might be OK with:

A C B (e.g., value += x)

and skipping B altogether might be ok if B has no side-effects (e.g., operation 
associated to B has failed).

Putting things together 
The current KIP-27 proposal seems to do a good job with providing a consistent 
log in the absence of concurrency. It enables publishers to re-publish messages 
without duplication, which is one requirement for exactly-once semantics. Gaps 
need to be handled by the publisher. For example, if the publisher publishes A 
B C (assuming it’s publishing asynchronously and in an open loop), it could 
have A succeeding but not B and C. In this case, it needs to redo the publish 
of B and C. It could also have B failing and C succeeding, in which case the 
publisher repeats B and C.

A really nice feature of the current proposal is that it is a simple primitive 
that enables the implementation of publishers with different delivery 
guarantees. It doesn’t seem to be well suited to the first problem of 
implementing a single writer with failover, however. It allows runs in which 
two producers interleave messages because the mechanism focuses on a single 
message. The single writer might not even care about duplicates and gaps 
depending on the application, but it might care that there aren’t two 
publishers interleaving messages in the Kafka log.

A typical way of dealing with these cases is to use a token associated to a 
lease to fence off the other publishers. For example, to demote an active 
publisher, another publisher could invoke a demote call and have the ISR leader 
replace the token. The lease of the token could be done directly with ZooKeeper 
or via the ISR leader. The condition to publish a message or a batch could be a 
combination of token verification and offset check.

-Flavio

 On 10 Aug 2015, at 00:15, Jun Rao j...@confluent.io wrote:
 
 Couple of other things.
 
 A. In the discussion, we talked about the usage of getting the latest high
 watermark from the broker. Currently, the high watermark in a partition can
 go back a bit for a short period of time during leader change. So, the high
 watermark returned in the getOffset api is not 100% accurate. There is a
 jira (KAFKA-2334) to track this issue.
 
 B. The proposal in the wiki is to put the expected offset in every message,
 even when the messages are compressed. With Jiangjie's proposal of relative
 offset, the expected offset probably can only be set at the shallow
 compressed message level. We will need to think this through.
 
 Thanks,
 
 Jun
 
 
 
 On Tue, Aug 4, 2015 at 3:05 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
 Jun, I see. So this only applies to uncompressed messages. Maybe that is
 fine given most user will probably turn on compression?
 I think the first approach is a more general approach but from application
 point of view might harder to implement. I am thinking is it easier for the
 application simply have one producer for a partition and hash the message
 to producer. In that case, 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-09 Thread Jun Rao
Couple of other things.

A. In the discussion, we talked about the usage of getting the latest high
watermark from the broker. Currently, the high watermark in a partition can
go back a bit for a short period of time during leader change. So, the high
watermark returned in the getOffset api is not 100% accurate. There is a
jira (KAFKA-2334) to track this issue.

B. The proposal in the wiki is to put the expected offset in every message,
even when the messages are compressed. With Jiangjie's proposal of relative
offset, the expected offset probably can only be set at the shallow
compressed message level. We will need to think this through.

Thanks,

Jun



On Tue, Aug 4, 2015 at 3:05 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Jun, I see. So this only applies to uncompressed messages. Maybe that is
 fine given most user will probably turn on compression?
  I think the first approach is a more general approach but from application
 point of view might harder to implement. I am thinking is it easier for the
 application simply have one producer for a partition and hash the message
 to producer. In that case, we can use the second approach but still have
 multiple producers. The downside might be potentially more memory
 footprint? We might also need to think about the fault tolerance a little
 bit more.

 Ben, I agree when everything goes fine, having pipeline turned on is
 probably fine. But if we take leader migration, broker down, message
 appended to leader but not follower, etc, etc into consideration, it is not
 clear to me how the conditional publish will still provide its guarantee
 without enforcing those strict settings.

 Thanks,

 Jiangjie (Becket) Qin



 On Mon, Aug 3, 2015 at 9:55 PM, Jun Rao j...@confluent.io wrote:

  A couple of thoughts on the commit log use case. Suppose that we want to
  maintain multiple replicas of a K/V store backed by a shared Kafka
  topic/partition as a commit log. There are two possible ways to use Kafka
  as a commit log.
 
  1. The first approach allows multiple producers to publish to Kafka. Each
  replica of the data store keeps reading from a Kafka topic/partition to
  refresh the replica's view. Every time a replica gets an update to a key
  from a client, it combines the update and the current value of the key in
  its view and generates a post-value. It then does a conditional publish
 to
  Kafka with the post-value. The update is successful if the conditional
  publish succeeds. Otherwise, the replica has to recompute the post-value
  (potentially after the replica's view is refreshed) and retry the
  conditional publish. A potential issue with this approach is when there
 is
  a transient failure during publishing to Kafka (e.g., the leader of the
  partition changes). When this happens, the conditional publish will get
 an
  error. The replica doesn't know whether the publish actually succeeded or
  not. If we just blindly retry, it may not give the correct behavior
 (e.g.,
  we could be applying +1 twice). So, not sure if conditional publish
 itself
  is enough for this approach.
 
  2. The second approach allows only a single producer to publish to Kafka.
  We somehow elect one of the replicas to be the master that handles all
  updates. Normally, we don't need conditional publish since there is a
  single producer. Conditional publish can potentially be used to deal with
  duplicates. If the master encounters the same transient failure as the
  above, it can get the latest offset from the Kafka topic/partition to see
  if the publish actually succeeded or not since it's the only producer. A
  potential issue here is to handle the zombie master problem: if the
 master
  has a soft failure and another master is elected, we need to prevent the
  old master from publishing new data to Kafka. So, for this approach to
 work
  properly, we need some kind of support of single writer in addition to
  conditional publish.
 
 
  Jiangjie,
 
  The issue with partial commit is the following. Say we have a batch of 10
  uncompressed messages sent to the leader. The followers only fetched the
  first 5 messages and then the leader dies. In this case, we only
 committed
  5 out of the 10 messages.
 
  Thanks,
 
  Jun
 
 
  On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck 
  daniel.schierb...@gmail.com wrote:
 
   Jiangjie: I think giving users the possibility of defining a custom
  policy
   for handling rejections is a good idea. For instance, this will allow
  Kafka
   to act as an event store in an Event Sourcing application. If the
  event(s)
   are rejected by the store, the original command may need to be
  re-validated
   against the new state.
  
   On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
 
   wrote:
  
@Ewen, good point about batching. Yes, it would be tricky if we want
 to
   do
a per-key conditional produce. My understanding is that the
  prerequisite
   of
this KIP is:
1. Single producer for each 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Ben Kirwin
This is a great summary of the commit log options. Some additional comments:

1. One way to handle a transient failure is to treat it the same way
as a conditional publish failure: recompute the post-value before
retrying. (I believe this is enough to make this approach work
correctly.)

2. You can think of 2 as an 'optimization' of 1: the election
mechanism is there to ensure that the conditional publish failures
happen very rarely. When there are no conflicts, the conditional
publish is essentially free.

I guess I think of the zombie master problem like this: given some
window of time where two nodes both think they are the master,
conditional publish is enough to ensure that only one of the two will
successfully publish. However, it's not enough to ensure that the
'new' master is the successful one. This might cause the leadership
transition to happen a bit later than it would otherwise, but it
doesn't seem to actually impact correctness.

On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote:
 A couple of thoughts on the commit log use case. Suppose that we want to
 maintain multiple replicas of a K/V store backed by a shared Kafka
 topic/partition as a commit log. There are two possible ways to use Kafka
 as a commit log.

 1. The first approach allows multiple producers to publish to Kafka. Each
 replica of the data store keeps reading from a Kafka topic/partition to
 refresh the replica's view. Every time a replica gets an update to a key
 from a client, it combines the update and the current value of the key in
 its view and generates a post-value. It then does a conditional publish to
 Kafka with the post-value. The update is successful if the conditional
 publish succeeds. Otherwise, the replica has to recompute the post-value
 (potentially after the replica's view is refreshed) and retry the
 conditional publish. A potential issue with this approach is when there is
 a transient failure during publishing to Kafka (e.g., the leader of the
 partition changes). When this happens, the conditional publish will get an
 error. The replica doesn't know whether the publish actually succeeded or
 not. If we just blindly retry, it may not give the correct behavior (e.g.,
 we could be applying +1 twice). So, not sure if conditional publish itself
 is enough for this approach.

 2. The second approach allows only a single producer to publish to Kafka.
 We somehow elect one of the replicas to be the master that handles all
 updates. Normally, we don't need conditional publish since there is a
 single producer. Conditional publish can potentially be used to deal with
 duplicates. If the master encounters the same transient failure as the
 above, it can get the latest offset from the Kafka topic/partition to see
 if the publish actually succeeded or not since it's the only producer. A
 potential issue here is to handle the zombie master problem: if the master
 has a soft failure and another master is elected, we need to prevent the
 old master from publishing new data to Kafka. So, for this approach to work
 properly, we need some kind of support of single writer in addition to
 conditional publish.


 Jiangjie,

 The issue with partial commit is the following. Say we have a batch of 10
 uncompressed messages sent to the leader. The followers only fetched the
 first 5 messages and then the leader dies. In this case, we only committed
 5 out of the 10 messages.

 Thanks,

 Jun


 On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

 Jiangjie: I think giving users the possibility of defining a custom policy
 for handling rejections is a good idea. For instance, this will allow Kafka
 to act as an event store in an Event Sourcing application. If the event(s)
 are rejected by the store, the original command may need to be re-validated
 against the new state.

 On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  @Ewen, good point about batching. Yes, it would be tricky if we want to
 do
  a per-key conditional produce. My understanding is that the prerequisite
 of
  this KIP is:
  1. Single producer for each partition.
  2. Acks=-1, max.in.flight.request.per.connection=1,
 retries=SOME_BIG_NUMBER
 
  The major problem it tries to solve is exact once produce, i.e. solve the
  duplicates from producer side. In that case, a batch will be considered
 as
  atomic. The only possibility of a batch got rejected should be it is
  already appended. So the producer should just move on.
 
  It looks to me even a transient multiple producer scenario will cause
 issue
  because user need to think about what should do if a request got rejected
  and the answer varies for different use cases.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:
 
   So I had another look at the 'Idempotent Producer' proposal this
   afternoon, and made a few notes on how I think they compare; if I've
   made any 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Ben Kirwin
Jiangjie -- it seems to me that, while there are cases where you want
conservative producer settings like you suggest, there are others
enabled by this KIP where pipelining and retries are not an issue.

As a toy example, I've adapted the producer performance test to behave
as an idempotent producer here:

https://github.com/bkirwi/kafka/blob/conditional-publish/tools/src/main/java/org/apache/kafka/clients/tools/IdempotentProducerPerformance.java

This appends the numbers 1-N in-order to some partition. If you've set
up the server to do the offset checks, it's possible to run multiple
instances of this producer without reordering messages or adding
duplicates. For this kind of application, pipelining turns out to be
safe... it might result in more failed messages if there's a conflict,
but it won't hurt correctness.

On Mon, Jul 27, 2015 at 7:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote:
 @Ewen, good point about batching. Yes, it would be tricky if we want to do
 a per-key conditional produce. My understanding is that the prerequisite of
 this KIP is:
 1. Single producer for each partition.
 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

 The major problem it tries to solve is exact once produce, i.e. solve the
 duplicates from producer side. In that case, a batch will be considered as
 atomic. The only possibility of a batch got rejected should be it is
 already appended. So the producer should just move on.

 It looks to me even a transient multiple producer scenario will cause issue
 because user need to think about what should do if a request got rejected
 and the answer varies for different use cases.

 Thanks,

 Jiangjie (Becket) Qin

 On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:

 So I had another look at the 'Idempotent Producer' proposal this
 afternoon, and made a few notes on how I think they compare; if I've
 made any mistakes, I'd be delighted if someone with more context on
 the idempotent producer design would correct me.

 As a first intuition, you can think of the 'conditional publish'
 proposal as the special case of the 'idempotent producer' idea, where
 there's just a single producer per-partition. The key observation here
 is: if there's only one producer, you can conflate the 'sequence
 number' and the expected offset. The conditional publish proposal uses
 existing Kafka offset APIs for roughly the same things as the
 idempotent producer proposal uses sequence numbers for -- eg. instead
 of having a lease PID API that returns the current sequence number,
 we can use the existing 'offset API' to retrieve the upcoming offset.

 Both proposals attempt to deal with the situation where there are
 transiently multiple publishers for the same partition (and PID). The
 idempotent producer setup tracks a generation id for each pid, and
 discards any writes with a generation id smaller than the latest
 value. Conditional publish is 'first write wins' -- and instead of
 dropping duplicates on the server, it returns an error to the client.
 The duplicate-handling behaviour (dropping vs. erroring) has some
 interesting consequences:

 - If all producers are producing the same stream of messages, silently
 dropping duplicates on the server is more convenient. (Suppose we have
 a batch of messages 0-9, and the high-water mark on the server is 7.
 Idempotent producer, as I read it, would append 7-9 to the partition
 and return success; meanwhile, conditional publish would fail the
 entire batch.)

 - If producers might be writing different streams of messages, the
 proposed behaviour of the idempotent producer is probably worse --
 since it can silently interleave messages from two different
 producers. This can be a problem for some commit-log style use-cases,
 since it can transform a valid series of operations into an invalid
 one.

 - Given the error-on-duplicate behaviour, it's possible to implement
 deduplication on the client. (Sketch: if a publish returns an error
 for some partition, fetch the upcoming offset / sequence number for
 that partition, and discard all messages with a smaller offset on the
 client before republishing.)

 I think this makes the erroring behaviour more general, though
 deduplicating saves a roundtrip or two at conflict time.

 I'm less clear about the behaviour of the generation id, or what
 happens when (say) two producers with the same generation id are spun
 up at the same time. I'd be interested in hearing other folks'
 comments on this.

 Ewen: I'm not sure I understand the questions well enough to answer
 properly, but some quick notes:
 - I don't think it makes sense to assign an expected offset without
 already having assigned a partition. If the producer code is doing the
 partition assignment, it should probably do the offset assignment
 too... or we could just let application code handle both.
 - I'm not aware of any case where reassigning offsets to messages
 automatically after an offset 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Jiangjie Qin
Jun, I see. So this only applies to uncompressed messages. Maybe that is
fine given most user will probably turn on compression?
 I think the first approach is a more general approach but from application
point of view might harder to implement. I am thinking is it easier for the
application simply have one producer for a partition and hash the message
to producer. In that case, we can use the second approach but still have
multiple producers. The downside might be potentially more memory
footprint? We might also need to think about the fault tolerance a little
bit more.

Ben, I agree when everything goes fine, having pipeline turned on is
probably fine. But if we take leader migration, broker down, message
appended to leader but not follower, etc, etc into consideration, it is not
clear to me how the conditional publish will still provide its guarantee
without enforcing those strict settings.

Thanks,

Jiangjie (Becket) Qin



On Mon, Aug 3, 2015 at 9:55 PM, Jun Rao j...@confluent.io wrote:

 A couple of thoughts on the commit log use case. Suppose that we want to
 maintain multiple replicas of a K/V store backed by a shared Kafka
 topic/partition as a commit log. There are two possible ways to use Kafka
 as a commit log.

 1. The first approach allows multiple producers to publish to Kafka. Each
 replica of the data store keeps reading from a Kafka topic/partition to
 refresh the replica's view. Every time a replica gets an update to a key
 from a client, it combines the update and the current value of the key in
 its view and generates a post-value. It then does a conditional publish to
 Kafka with the post-value. The update is successful if the conditional
 publish succeeds. Otherwise, the replica has to recompute the post-value
 (potentially after the replica's view is refreshed) and retry the
 conditional publish. A potential issue with this approach is when there is
 a transient failure during publishing to Kafka (e.g., the leader of the
 partition changes). When this happens, the conditional publish will get an
 error. The replica doesn't know whether the publish actually succeeded or
 not. If we just blindly retry, it may not give the correct behavior (e.g.,
 we could be applying +1 twice). So, not sure if conditional publish itself
 is enough for this approach.

 2. The second approach allows only a single producer to publish to Kafka.
 We somehow elect one of the replicas to be the master that handles all
 updates. Normally, we don't need conditional publish since there is a
 single producer. Conditional publish can potentially be used to deal with
 duplicates. If the master encounters the same transient failure as the
 above, it can get the latest offset from the Kafka topic/partition to see
 if the publish actually succeeded or not since it's the only producer. A
 potential issue here is to handle the zombie master problem: if the master
 has a soft failure and another master is elected, we need to prevent the
 old master from publishing new data to Kafka. So, for this approach to work
 properly, we need some kind of support of single writer in addition to
 conditional publish.


 Jiangjie,

 The issue with partial commit is the following. Say we have a batch of 10
 uncompressed messages sent to the leader. The followers only fetched the
 first 5 messages and then the leader dies. In this case, we only committed
 5 out of the 10 messages.

 Thanks,

 Jun


 On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck 
 daniel.schierb...@gmail.com wrote:

  Jiangjie: I think giving users the possibility of defining a custom
 policy
  for handling rejections is a good idea. For instance, this will allow
 Kafka
  to act as an event store in an Event Sourcing application. If the
 event(s)
  are rejected by the store, the original command may need to be
 re-validated
  against the new state.
 
  On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
  wrote:
 
   @Ewen, good point about batching. Yes, it would be tricky if we want to
  do
   a per-key conditional produce. My understanding is that the
 prerequisite
  of
   this KIP is:
   1. Single producer for each partition.
   2. Acks=-1, max.in.flight.request.per.connection=1,
  retries=SOME_BIG_NUMBER
  
   The major problem it tries to solve is exact once produce, i.e. solve
 the
   duplicates from producer side. In that case, a batch will be considered
  as
   atomic. The only possibility of a batch got rejected should be it is
   already appended. So the producer should just move on.
  
   It looks to me even a transient multiple producer scenario will cause
  issue
   because user need to think about what should do if a request got
 rejected
   and the answer varies for different use cases.
  
   Thanks,
  
   Jiangjie (Becket) Qin
  
   On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:
  
So I had another look at the 'Idempotent Producer' proposal this
afternoon, and made a few notes on how I think they 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Jay Kreps
Hey Jun,

Yeah I think Ben is right, both these cases are covered. The algorithm is
something like

while(true) {
  v = get_local_val(key)
  v' = modify(v)
  try {
log_to_kafka(v')
put_local_val(k, v')
break
  } catch(CasFailureException e) {
warn(optimistic lock failure)
  }

}

What I have yet to see is a complete design that would make use of this. I
think we have two use cases this might (or might not) cover, a distributed
log-centric data system and an event-sourced app. Both are actually kind of
the same. I think what I haven't really seen is a working through of the
details.

I think it is important to think through this use case end-to-end. i.e. how
is concurrency handled? what about other errors? what about request
pipelining? how would queries work?

Basically someone should write out notes on how to implement a key-value
store with a kafka commit log assuming the existence of this feature and
making use of something like RocksDB. I think that would uncover any
problems that remain.

-Jay

On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin b...@kirw.in wrote:

 This is a great summary of the commit log options. Some additional
 comments:

 1. One way to handle a transient failure is to treat it the same way
 as a conditional publish failure: recompute the post-value before
 retrying. (I believe this is enough to make this approach work
 correctly.)

 2. You can think of 2 as an 'optimization' of 1: the election
 mechanism is there to ensure that the conditional publish failures
 happen very rarely. When there are no conflicts, the conditional
 publish is essentially free.

 I guess I think of the zombie master problem like this: given some
 window of time where two nodes both think they are the master,
 conditional publish is enough to ensure that only one of the two will
 successfully publish. However, it's not enough to ensure that the
 'new' master is the successful one. This might cause the leadership
 transition to happen a bit later than it would otherwise, but it
 doesn't seem to actually impact correctness.

 On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote:
  A couple of thoughts on the commit log use case. Suppose that we want to
  maintain multiple replicas of a K/V store backed by a shared Kafka
  topic/partition as a commit log. There are two possible ways to use Kafka
  as a commit log.
 
  1. The first approach allows multiple producers to publish to Kafka. Each
  replica of the data store keeps reading from a Kafka topic/partition to
  refresh the replica's view. Every time a replica gets an update to a key
  from a client, it combines the update and the current value of the key in
  its view and generates a post-value. It then does a conditional publish
 to
  Kafka with the post-value. The update is successful if the conditional
  publish succeeds. Otherwise, the replica has to recompute the post-value
  (potentially after the replica's view is refreshed) and retry the
  conditional publish. A potential issue with this approach is when there
 is
  a transient failure during publishing to Kafka (e.g., the leader of the
  partition changes). When this happens, the conditional publish will get
 an
  error. The replica doesn't know whether the publish actually succeeded or
  not. If we just blindly retry, it may not give the correct behavior
 (e.g.,
  we could be applying +1 twice). So, not sure if conditional publish
 itself
  is enough for this approach.
 
  2. The second approach allows only a single producer to publish to Kafka.
  We somehow elect one of the replicas to be the master that handles all
  updates. Normally, we don't need conditional publish since there is a
  single producer. Conditional publish can potentially be used to deal with
  duplicates. If the master encounters the same transient failure as the
  above, it can get the latest offset from the Kafka topic/partition to see
  if the publish actually succeeded or not since it's the only producer. A
  potential issue here is to handle the zombie master problem: if the
 master
  has a soft failure and another master is elected, we need to prevent the
  old master from publishing new data to Kafka. So, for this approach to
 work
  properly, we need some kind of support of single writer in addition to
  conditional publish.
 
 
  Jiangjie,
 
  The issue with partial commit is the following. Say we have a batch of 10
  uncompressed messages sent to the leader. The followers only fetched the
  first 5 messages and then the leader dies. In this case, we only
 committed
  5 out of the 10 messages.
 
  Thanks,
 
  Jun
 
 
  On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck 
  daniel.schierb...@gmail.com wrote:
 
  Jiangjie: I think giving users the possibility of defining a custom
 policy
  for handling rejections is a good idea. For instance, this will allow
 Kafka
  to act as an event store in an Event Sourcing application. If the
 event(s)
  are rejected by the store, the original command may 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Yasuhiro Matsuda
Jun,

You are right that Jay's example omits the consuming part, and the local
state should be maintained only by consumed messages so that all replicas
are eventually consistent. Assuming that, the beauty of conditional push is
that it is safe for clients to do this independently. It guarantees that a
successful update is always based on the most recent value. I think the
fact that no master is needed is the greatest advantage.

I think there shouldn't be any automatic retry. An application logic should
handle when a conditional push failed. It is very application dependent
what to do on failure. In Jay's example, it looks that the code keep
modifying and pushing until it succeeds. But I don't think it was what Jay
meant. modify(v) may throw an exception if the application decides to give
up the update.

Regarding the transient failure, isn't it reasonable to assume an
application can tell success/failure from data themselves? Anyway, this KIP
is not trying to solve a transient failure problem. It should be left to
the idempotent producer.



On Tue, Aug 4, 2015 at 9:42 AM, Jun Rao j...@confluent.io wrote:

 Jay,

 The code that you put there seems to be designed for the second case where
 there is a master. In the first case when there is no master, updates can
 happen from multiple replicas. Therefore, to maintain the local view, each
 replica can't just update the local view using only the updates sent to it
 directly. Instead, it has to read from the Kafka log to see updates from
 other replicas.

 Then the question is what happens when you get a
 LeaderNotAvailableException/IOException when calling log_to_kafka(v'). That
 call may or may not have succeeded. In the first case, it's hard for a
 replica to figure that out since there are other producers to the Kafka
 log. If you just retry, you may have modified the value incorrectly. For
 example, the conditional publish may have actually succeeded and the local
 view will at some point reflect that change. By retrying, we will be
 modifying the local view again (e.g., applying +1 twice to a value).

 I agree that it would good to see a more detailed end-to-end design to
 determine how conditional publish can be used and how much of the problem
 it solves.

 Thanks,

 Jun

 On Tue, Aug 4, 2015 at 9:18 AM, Jay Kreps j...@confluent.io wrote:

  Hey Jun,
 
  Yeah I think Ben is right, both these cases are covered. The algorithm is
  something like
 
  while(true) {
v = get_local_val(key)
v' = modify(v)
try {
  log_to_kafka(v')
  put_local_val(k, v')
  break
} catch(CasFailureException e) {
  warn(optimistic lock failure)
}
 
  }
 
  What I have yet to see is a complete design that would make use of this.
 I
  think we have two use cases this might (or might not) cover, a
 distributed
  log-centric data system and an event-sourced app. Both are actually kind
 of
  the same. I think what I haven't really seen is a working through of the
  details.
 
  I think it is important to think through this use case end-to-end. i.e.
 how
  is concurrency handled? what about other errors? what about request
  pipelining? how would queries work?
 
  Basically someone should write out notes on how to implement a key-value
  store with a kafka commit log assuming the existence of this feature and
  making use of something like RocksDB. I think that would uncover any
  problems that remain.
 
  -Jay
 
  On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin b...@kirw.in wrote:
 
   This is a great summary of the commit log options. Some additional
   comments:
  
   1. One way to handle a transient failure is to treat it the same way
   as a conditional publish failure: recompute the post-value before
   retrying. (I believe this is enough to make this approach work
   correctly.)
  
   2. You can think of 2 as an 'optimization' of 1: the election
   mechanism is there to ensure that the conditional publish failures
   happen very rarely. When there are no conflicts, the conditional
   publish is essentially free.
  
   I guess I think of the zombie master problem like this: given some
   window of time where two nodes both think they are the master,
   conditional publish is enough to ensure that only one of the two will
   successfully publish. However, it's not enough to ensure that the
   'new' master is the successful one. This might cause the leadership
   transition to happen a bit later than it would otherwise, but it
   doesn't seem to actually impact correctness.
  
   On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote:
A couple of thoughts on the commit log use case. Suppose that we want
  to
maintain multiple replicas of a K/V store backed by a shared Kafka
topic/partition as a commit log. There are two possible ways to use
  Kafka
as a commit log.
   
1. The first approach allows multiple producers to publish to Kafka.
  Each
replica of the data store keeps reading from a Kafka topic/partition
 to

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-04 Thread Jun Rao
Jay,

The code that you put there seems to be designed for the second case where
there is a master. In the first case when there is no master, updates can
happen from multiple replicas. Therefore, to maintain the local view, each
replica can't just update the local view using only the updates sent to it
directly. Instead, it has to read from the Kafka log to see updates from
other replicas.

Then the question is what happens when you get a
LeaderNotAvailableException/IOException when calling log_to_kafka(v'). That
call may or may not have succeeded. In the first case, it's hard for a
replica to figure that out since there are other producers to the Kafka
log. If you just retry, you may have modified the value incorrectly. For
example, the conditional publish may have actually succeeded and the local
view will at some point reflect that change. By retrying, we will be
modifying the local view again (e.g., applying +1 twice to a value).

I agree that it would good to see a more detailed end-to-end design to
determine how conditional publish can be used and how much of the problem
it solves.

Thanks,

Jun

On Tue, Aug 4, 2015 at 9:18 AM, Jay Kreps j...@confluent.io wrote:

 Hey Jun,

 Yeah I think Ben is right, both these cases are covered. The algorithm is
 something like

 while(true) {
   v = get_local_val(key)
   v' = modify(v)
   try {
 log_to_kafka(v')
 put_local_val(k, v')
 break
   } catch(CasFailureException e) {
 warn(optimistic lock failure)
   }

 }

 What I have yet to see is a complete design that would make use of this. I
 think we have two use cases this might (or might not) cover, a distributed
 log-centric data system and an event-sourced app. Both are actually kind of
 the same. I think what I haven't really seen is a working through of the
 details.

 I think it is important to think through this use case end-to-end. i.e. how
 is concurrency handled? what about other errors? what about request
 pipelining? how would queries work?

 Basically someone should write out notes on how to implement a key-value
 store with a kafka commit log assuming the existence of this feature and
 making use of something like RocksDB. I think that would uncover any
 problems that remain.

 -Jay

 On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin b...@kirw.in wrote:

  This is a great summary of the commit log options. Some additional
  comments:
 
  1. One way to handle a transient failure is to treat it the same way
  as a conditional publish failure: recompute the post-value before
  retrying. (I believe this is enough to make this approach work
  correctly.)
 
  2. You can think of 2 as an 'optimization' of 1: the election
  mechanism is there to ensure that the conditional publish failures
  happen very rarely. When there are no conflicts, the conditional
  publish is essentially free.
 
  I guess I think of the zombie master problem like this: given some
  window of time where two nodes both think they are the master,
  conditional publish is enough to ensure that only one of the two will
  successfully publish. However, it's not enough to ensure that the
  'new' master is the successful one. This might cause the leadership
  transition to happen a bit later than it would otherwise, but it
  doesn't seem to actually impact correctness.
 
  On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao j...@confluent.io wrote:
   A couple of thoughts on the commit log use case. Suppose that we want
 to
   maintain multiple replicas of a K/V store backed by a shared Kafka
   topic/partition as a commit log. There are two possible ways to use
 Kafka
   as a commit log.
  
   1. The first approach allows multiple producers to publish to Kafka.
 Each
   replica of the data store keeps reading from a Kafka topic/partition to
   refresh the replica's view. Every time a replica gets an update to a
 key
   from a client, it combines the update and the current value of the key
 in
   its view and generates a post-value. It then does a conditional publish
  to
   Kafka with the post-value. The update is successful if the conditional
   publish succeeds. Otherwise, the replica has to recompute the
 post-value
   (potentially after the replica's view is refreshed) and retry the
   conditional publish. A potential issue with this approach is when there
  is
   a transient failure during publishing to Kafka (e.g., the leader of the
   partition changes). When this happens, the conditional publish will get
  an
   error. The replica doesn't know whether the publish actually succeeded
 or
   not. If we just blindly retry, it may not give the correct behavior
  (e.g.,
   we could be applying +1 twice). So, not sure if conditional publish
  itself
   is enough for this approach.
  
   2. The second approach allows only a single producer to publish to
 Kafka.
   We somehow elect one of the replicas to be the master that handles all
   updates. Normally, we don't need conditional publish since there is a
   single producer. Conditional 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-08-03 Thread Jun Rao
A couple of thoughts on the commit log use case. Suppose that we want to
maintain multiple replicas of a K/V store backed by a shared Kafka
topic/partition as a commit log. There are two possible ways to use Kafka
as a commit log.

1. The first approach allows multiple producers to publish to Kafka. Each
replica of the data store keeps reading from a Kafka topic/partition to
refresh the replica's view. Every time a replica gets an update to a key
from a client, it combines the update and the current value of the key in
its view and generates a post-value. It then does a conditional publish to
Kafka with the post-value. The update is successful if the conditional
publish succeeds. Otherwise, the replica has to recompute the post-value
(potentially after the replica's view is refreshed) and retry the
conditional publish. A potential issue with this approach is when there is
a transient failure during publishing to Kafka (e.g., the leader of the
partition changes). When this happens, the conditional publish will get an
error. The replica doesn't know whether the publish actually succeeded or
not. If we just blindly retry, it may not give the correct behavior (e.g.,
we could be applying +1 twice). So, not sure if conditional publish itself
is enough for this approach.

2. The second approach allows only a single producer to publish to Kafka.
We somehow elect one of the replicas to be the master that handles all
updates. Normally, we don't need conditional publish since there is a
single producer. Conditional publish can potentially be used to deal with
duplicates. If the master encounters the same transient failure as the
above, it can get the latest offset from the Kafka topic/partition to see
if the publish actually succeeded or not since it's the only producer. A
potential issue here is to handle the zombie master problem: if the master
has a soft failure and another master is elected, we need to prevent the
old master from publishing new data to Kafka. So, for this approach to work
properly, we need some kind of support of single writer in addition to
conditional publish.


Jiangjie,

The issue with partial commit is the following. Say we have a batch of 10
uncompressed messages sent to the leader. The followers only fetched the
first 5 messages and then the leader dies. In this case, we only committed
5 out of the 10 messages.

Thanks,

Jun


On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:

 Jiangjie: I think giving users the possibility of defining a custom policy
 for handling rejections is a good idea. For instance, this will allow Kafka
 to act as an event store in an Event Sourcing application. If the event(s)
 are rejected by the store, the original command may need to be re-validated
 against the new state.

 On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  @Ewen, good point about batching. Yes, it would be tricky if we want to
 do
  a per-key conditional produce. My understanding is that the prerequisite
 of
  this KIP is:
  1. Single producer for each partition.
  2. Acks=-1, max.in.flight.request.per.connection=1,
 retries=SOME_BIG_NUMBER
 
  The major problem it tries to solve is exact once produce, i.e. solve the
  duplicates from producer side. In that case, a batch will be considered
 as
  atomic. The only possibility of a batch got rejected should be it is
  already appended. So the producer should just move on.
 
  It looks to me even a transient multiple producer scenario will cause
 issue
  because user need to think about what should do if a request got rejected
  and the answer varies for different use cases.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:
 
   So I had another look at the 'Idempotent Producer' proposal this
   afternoon, and made a few notes on how I think they compare; if I've
   made any mistakes, I'd be delighted if someone with more context on
   the idempotent producer design would correct me.
  
   As a first intuition, you can think of the 'conditional publish'
   proposal as the special case of the 'idempotent producer' idea, where
   there's just a single producer per-partition. The key observation here
   is: if there's only one producer, you can conflate the 'sequence
   number' and the expected offset. The conditional publish proposal uses
   existing Kafka offset APIs for roughly the same things as the
   idempotent producer proposal uses sequence numbers for -- eg. instead
   of having a lease PID API that returns the current sequence number,
   we can use the existing 'offset API' to retrieve the upcoming offset.
  
   Both proposals attempt to deal with the situation where there are
   transiently multiple publishers for the same partition (and PID). The
   idempotent producer setup tracks a generation id for each pid, and
   discards any writes with a generation id smaller than the latest
   value. Conditional publish is 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-28 Thread Daniel Schierbeck
Jiangjie: I think giving users the possibility of defining a custom policy
for handling rejections is a good idea. For instance, this will allow Kafka
to act as an event store in an Event Sourcing application. If the event(s)
are rejected by the store, the original command may need to be re-validated
against the new state.

On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin j...@linkedin.com.invalid
wrote:

 @Ewen, good point about batching. Yes, it would be tricky if we want to do
 a per-key conditional produce. My understanding is that the prerequisite of
 this KIP is:
 1. Single producer for each partition.
 2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

 The major problem it tries to solve is exact once produce, i.e. solve the
 duplicates from producer side. In that case, a batch will be considered as
 atomic. The only possibility of a batch got rejected should be it is
 already appended. So the producer should just move on.

 It looks to me even a transient multiple producer scenario will cause issue
 because user need to think about what should do if a request got rejected
 and the answer varies for different use cases.

 Thanks,

 Jiangjie (Becket) Qin

 On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:

  So I had another look at the 'Idempotent Producer' proposal this
  afternoon, and made a few notes on how I think they compare; if I've
  made any mistakes, I'd be delighted if someone with more context on
  the idempotent producer design would correct me.
 
  As a first intuition, you can think of the 'conditional publish'
  proposal as the special case of the 'idempotent producer' idea, where
  there's just a single producer per-partition. The key observation here
  is: if there's only one producer, you can conflate the 'sequence
  number' and the expected offset. The conditional publish proposal uses
  existing Kafka offset APIs for roughly the same things as the
  idempotent producer proposal uses sequence numbers for -- eg. instead
  of having a lease PID API that returns the current sequence number,
  we can use the existing 'offset API' to retrieve the upcoming offset.
 
  Both proposals attempt to deal with the situation where there are
  transiently multiple publishers for the same partition (and PID). The
  idempotent producer setup tracks a generation id for each pid, and
  discards any writes with a generation id smaller than the latest
  value. Conditional publish is 'first write wins' -- and instead of
  dropping duplicates on the server, it returns an error to the client.
  The duplicate-handling behaviour (dropping vs. erroring) has some
  interesting consequences:
 
  - If all producers are producing the same stream of messages, silently
  dropping duplicates on the server is more convenient. (Suppose we have
  a batch of messages 0-9, and the high-water mark on the server is 7.
  Idempotent producer, as I read it, would append 7-9 to the partition
  and return success; meanwhile, conditional publish would fail the
  entire batch.)
 
  - If producers might be writing different streams of messages, the
  proposed behaviour of the idempotent producer is probably worse --
  since it can silently interleave messages from two different
  producers. This can be a problem for some commit-log style use-cases,
  since it can transform a valid series of operations into an invalid
  one.
 
  - Given the error-on-duplicate behaviour, it's possible to implement
  deduplication on the client. (Sketch: if a publish returns an error
  for some partition, fetch the upcoming offset / sequence number for
  that partition, and discard all messages with a smaller offset on the
  client before republishing.)
 
  I think this makes the erroring behaviour more general, though
  deduplicating saves a roundtrip or two at conflict time.
 
  I'm less clear about the behaviour of the generation id, or what
  happens when (say) two producers with the same generation id are spun
  up at the same time. I'd be interested in hearing other folks'
  comments on this.
 
  Ewen: I'm not sure I understand the questions well enough to answer
  properly, but some quick notes:
  - I don't think it makes sense to assign an expected offset without
  already having assigned a partition. If the producer code is doing the
  partition assignment, it should probably do the offset assignment
  too... or we could just let application code handle both.
  - I'm not aware of any case where reassigning offsets to messages
  automatically after an offset mismatch makes sense: in the cases we've
  discussed, it seems like either it's safe to drop duplicates, or we
  want to handle the error at the application level.
 
  I'm going to try and come with an idempotent-producer-type example
  that works with the draft patch in the next few days, so hopefully
  we'll have something more concrete to discuss. Otherwise -- if you
  have a clear idea of how eg. sequence number assignment would work in
  the 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-27 Thread Jiangjie Qin
@Ewen, good point about batching. Yes, it would be tricky if we want to do
a per-key conditional produce. My understanding is that the prerequisite of
this KIP is:
1. Single producer for each partition.
2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

The major problem it tries to solve is exact once produce, i.e. solve the
duplicates from producer side. In that case, a batch will be considered as
atomic. The only possibility of a batch got rejected should be it is
already appended. So the producer should just move on.

It looks to me even a transient multiple producer scenario will cause issue
because user need to think about what should do if a request got rejected
and the answer varies for different use cases.

Thanks,

Jiangjie (Becket) Qin

On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin b...@kirw.in wrote:

 So I had another look at the 'Idempotent Producer' proposal this
 afternoon, and made a few notes on how I think they compare; if I've
 made any mistakes, I'd be delighted if someone with more context on
 the idempotent producer design would correct me.

 As a first intuition, you can think of the 'conditional publish'
 proposal as the special case of the 'idempotent producer' idea, where
 there's just a single producer per-partition. The key observation here
 is: if there's only one producer, you can conflate the 'sequence
 number' and the expected offset. The conditional publish proposal uses
 existing Kafka offset APIs for roughly the same things as the
 idempotent producer proposal uses sequence numbers for -- eg. instead
 of having a lease PID API that returns the current sequence number,
 we can use the existing 'offset API' to retrieve the upcoming offset.

 Both proposals attempt to deal with the situation where there are
 transiently multiple publishers for the same partition (and PID). The
 idempotent producer setup tracks a generation id for each pid, and
 discards any writes with a generation id smaller than the latest
 value. Conditional publish is 'first write wins' -- and instead of
 dropping duplicates on the server, it returns an error to the client.
 The duplicate-handling behaviour (dropping vs. erroring) has some
 interesting consequences:

 - If all producers are producing the same stream of messages, silently
 dropping duplicates on the server is more convenient. (Suppose we have
 a batch of messages 0-9, and the high-water mark on the server is 7.
 Idempotent producer, as I read it, would append 7-9 to the partition
 and return success; meanwhile, conditional publish would fail the
 entire batch.)

 - If producers might be writing different streams of messages, the
 proposed behaviour of the idempotent producer is probably worse --
 since it can silently interleave messages from two different
 producers. This can be a problem for some commit-log style use-cases,
 since it can transform a valid series of operations into an invalid
 one.

 - Given the error-on-duplicate behaviour, it's possible to implement
 deduplication on the client. (Sketch: if a publish returns an error
 for some partition, fetch the upcoming offset / sequence number for
 that partition, and discard all messages with a smaller offset on the
 client before republishing.)

 I think this makes the erroring behaviour more general, though
 deduplicating saves a roundtrip or two at conflict time.

 I'm less clear about the behaviour of the generation id, or what
 happens when (say) two producers with the same generation id are spun
 up at the same time. I'd be interested in hearing other folks'
 comments on this.

 Ewen: I'm not sure I understand the questions well enough to answer
 properly, but some quick notes:
 - I don't think it makes sense to assign an expected offset without
 already having assigned a partition. If the producer code is doing the
 partition assignment, it should probably do the offset assignment
 too... or we could just let application code handle both.
 - I'm not aware of any case where reassigning offsets to messages
 automatically after an offset mismatch makes sense: in the cases we've
 discussed, it seems like either it's safe to drop duplicates, or we
 want to handle the error at the application level.

 I'm going to try and come with an idempotent-producer-type example
 that works with the draft patch in the next few days, so hopefully
 we'll have something more concrete to discuss. Otherwise -- if you
 have a clear idea of how eg. sequence number assignment would work in
 the idempotent-producer proposal, we could probably translate that
 over to get the equivalent for the conditional publish API.



 On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
 e...@confluent.io wrote:
  @Becket - for compressed batches, I think this just works out given the
 KIP
  as described. Without the change you're referring to, it still only makes
  sense to batch messages with this KIP if all the expected offsets are
  sequential (else some messages are guaranteed to 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-26 Thread Ben Kirwin
So I had another look at the 'Idempotent Producer' proposal this
afternoon, and made a few notes on how I think they compare; if I've
made any mistakes, I'd be delighted if someone with more context on
the idempotent producer design would correct me.

As a first intuition, you can think of the 'conditional publish'
proposal as the special case of the 'idempotent producer' idea, where
there's just a single producer per-partition. The key observation here
is: if there's only one producer, you can conflate the 'sequence
number' and the expected offset. The conditional publish proposal uses
existing Kafka offset APIs for roughly the same things as the
idempotent producer proposal uses sequence numbers for -- eg. instead
of having a lease PID API that returns the current sequence number,
we can use the existing 'offset API' to retrieve the upcoming offset.

Both proposals attempt to deal with the situation where there are
transiently multiple publishers for the same partition (and PID). The
idempotent producer setup tracks a generation id for each pid, and
discards any writes with a generation id smaller than the latest
value. Conditional publish is 'first write wins' -- and instead of
dropping duplicates on the server, it returns an error to the client.
The duplicate-handling behaviour (dropping vs. erroring) has some
interesting consequences:

- If all producers are producing the same stream of messages, silently
dropping duplicates on the server is more convenient. (Suppose we have
a batch of messages 0-9, and the high-water mark on the server is 7.
Idempotent producer, as I read it, would append 7-9 to the partition
and return success; meanwhile, conditional publish would fail the
entire batch.)

- If producers might be writing different streams of messages, the
proposed behaviour of the idempotent producer is probably worse --
since it can silently interleave messages from two different
producers. This can be a problem for some commit-log style use-cases,
since it can transform a valid series of operations into an invalid
one.

- Given the error-on-duplicate behaviour, it's possible to implement
deduplication on the client. (Sketch: if a publish returns an error
for some partition, fetch the upcoming offset / sequence number for
that partition, and discard all messages with a smaller offset on the
client before republishing.)

I think this makes the erroring behaviour more general, though
deduplicating saves a roundtrip or two at conflict time.

I'm less clear about the behaviour of the generation id, or what
happens when (say) two producers with the same generation id are spun
up at the same time. I'd be interested in hearing other folks'
comments on this.

Ewen: I'm not sure I understand the questions well enough to answer
properly, but some quick notes:
- I don't think it makes sense to assign an expected offset without
already having assigned a partition. If the producer code is doing the
partition assignment, it should probably do the offset assignment
too... or we could just let application code handle both.
- I'm not aware of any case where reassigning offsets to messages
automatically after an offset mismatch makes sense: in the cases we've
discussed, it seems like either it's safe to drop duplicates, or we
want to handle the error at the application level.

I'm going to try and come with an idempotent-producer-type example
that works with the draft patch in the next few days, so hopefully
we'll have something more concrete to discuss. Otherwise -- if you
have a clear idea of how eg. sequence number assignment would work in
the idempotent-producer proposal, we could probably translate that
over to get the equivalent for the conditional publish API.



On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
e...@confluent.io wrote:
 @Becket - for compressed batches, I think this just works out given the KIP
 as described. Without the change you're referring to, it still only makes
 sense to batch messages with this KIP if all the expected offsets are
 sequential (else some messages are guaranteed to fail). I think that
 probably just works out, but raises an issue I brought up on the KIP call.

 Batching can be a bit weird with this proposal. If you try to write key A
 and key B, the second operation is dependent on the first. Which means to
 make an effective client for this, we need to keep track of per-partition
 offsets so we can set expected offsets properly. For example, if A was
 expected to publish at offset 10, then if B was published to the same
 partition, we need to make sure it's marked as expected offset 11 (assuming
 no subpartition high water marks). We either need to have the application
 keep track of this itself and set the offsets, which requires that it know
 about how keys map to partitions, or the client needs to manage this
 process. But if the client manages it, I think the client gets quite a bit
 more complicated. If the produce request containing A fails, what happens
 to 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-24 Thread Ewen Cheslack-Postava
@Becket - for compressed batches, I think this just works out given the KIP
as described. Without the change you're referring to, it still only makes
sense to batch messages with this KIP if all the expected offsets are
sequential (else some messages are guaranteed to fail). I think that
probably just works out, but raises an issue I brought up on the KIP call.

Batching can be a bit weird with this proposal. If you try to write key A
and key B, the second operation is dependent on the first. Which means to
make an effective client for this, we need to keep track of per-partition
offsets so we can set expected offsets properly. For example, if A was
expected to publish at offset 10, then if B was published to the same
partition, we need to make sure it's marked as expected offset 11 (assuming
no subpartition high water marks). We either need to have the application
keep track of this itself and set the offsets, which requires that it know
about how keys map to partitions, or the client needs to manage this
process. But if the client manages it, I think the client gets quite a bit
more complicated. If the produce request containing A fails, what happens
to B? Are there retries that somehow update the expected offset, or do we
just give up since we know it's always going to fail with the expected
offset that was automatically assigned to it?

One way to handle this is to use Yasuhiro's idea of increasing the
granularity of high watermarks using subpartitions. But I guess my question
is: if one producer client is writing many keys, and some of those keys are
produced to the same partition, and those messages are batched, what
happens? Do we end up with lots of failed messages? Or do we have
complicated logic in the producer to figure out what the right expected
offset for each message is? Or do they all share the same base expected
offset as in the compressed case, in which case they all share the same
fate and subpartitioning doesn't help? Or is there a simpler solution I'm
just not seeing? Maybe this just disables batching entirely and throughput
isn't an issue in these cases?

Sorry, I know that's probably not entirely clear, but that's because I'm
very uncertain of how batching works with this KIP.


On how this relates to other proposals: I think it might also be helpful to
get an overview of all the proposals for relevant modifications to
producers/produce requests since many of these proposals are possibly
alternatives (though some may not be mutually exclusive). Many people don't
have all the context from the past couple of years of the project. Are
there any other relevant wikis or docs besides the following?

https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

-Ewen

On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Tangent: I think we should complete the move of Produce / Fetch RPC to
 the client libraries before we add more revisions to this protocol.

 On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
 j...@linkedin.com.invalid wrote:
  I missed yesterday's KIP hangout. I'm currently working on another KIP
 for
  enriched metadata of messages. Guozhang has already created a wiki page
  before (
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
 ).
  We plan to fill the relative offset to the offset field in the batch sent
  by producer to avoid broker side re-compression. The message offset would
  become batch base offset + relative offset. I guess maybe the expected
  offset in KIP-27 can be only set for base offset? Would that affect
 certain
  use cases?
 
  For Jun's comments, I am not sure I completely get it. I think the
 producer
  only sends one batch per partition in a request. So either that batch is
  appended or not. Why a batch would be partially committed?
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote:
 
  That's a fair point. I've added some imagined job logic to the KIP, so
  we can make sure the proposal stays in sync with the usages we're
  discussing. (The logic is just a quick sketch for now -- I expect I'll
  need to elaborate it as we get into more detail, or to address other
  concerns...)
 
  On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote:
   For 1, yes, when there is a transient leader change, it's guaranteed
  that a
   prefix of the messages in a request will be committed. However, it
 seems
   that the client needs to know what subset of messages are committed in
   order to resume the sending. Then the question is how.
  
   As Flavio indicated, for the use cases that you listed, it would be
  useful
   to figure out the exact logic by using this feature. For example, in
 the
   partition K/V store example, when we fail over to a new writer to the
  

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-22 Thread Gwen Shapira
Tangent: I think we should complete the move of Produce / Fetch RPC to
the client libraries before we add more revisions to this protocol.

On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
j...@linkedin.com.invalid wrote:
 I missed yesterday's KIP hangout. I'm currently working on another KIP for
 enriched metadata of messages. Guozhang has already created a wiki page
 before (
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
 We plan to fill the relative offset to the offset field in the batch sent
 by producer to avoid broker side re-compression. The message offset would
 become batch base offset + relative offset. I guess maybe the expected
 offset in KIP-27 can be only set for base offset? Would that affect certain
 use cases?

 For Jun's comments, I am not sure I completely get it. I think the producer
 only sends one batch per partition in a request. So either that batch is
 appended or not. Why a batch would be partially committed?

 Thanks,

 Jiangjie (Becket) Qin

 On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote:

 That's a fair point. I've added some imagined job logic to the KIP, so
 we can make sure the proposal stays in sync with the usages we're
 discussing. (The logic is just a quick sketch for now -- I expect I'll
 need to elaborate it as we get into more detail, or to address other
 concerns...)

 On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote:
  For 1, yes, when there is a transient leader change, it's guaranteed
 that a
  prefix of the messages in a request will be committed. However, it seems
  that the client needs to know what subset of messages are committed in
  order to resume the sending. Then the question is how.
 
  As Flavio indicated, for the use cases that you listed, it would be
 useful
  to figure out the exact logic by using this feature. For example, in the
  partition K/V store example, when we fail over to a new writer to the
  commit log, the zombie writer can publish new messages to the log after
 the
  new writer takes over, but before it publishes any message. We probably
  need to outline how this case can be handled properly.
 
  Thanks,
 
  Jun
 
  On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:
 
  Hi Jun,
 
  Thanks for the close reading! Responses inline.
 
   Thanks for the write-up. The single producer use case you mentioned
 makes
   sense. It would be useful to include that in the KIP wiki.
 
  Great -- I'll make sure that the wiki is clear about this.
 
   1. What happens when the leader of the partition changes in the middle
  of a
   produce request? In this case, the producer client is not sure whether
  the
   request succeeds or not. If there is only a single message in the
  request,
   the producer can just resend the request. If it sees an OffsetMismatch
   error, it knows that the previous send actually succeeded and can
 proceed
   with the next write. This is nice since it not only allows the
 producer
  to
   proceed during transient failures in the broker, it also avoids
  duplicates
   during producer resend. One caveat is when there are multiple
 messages in
   the same partition in a produce request. The issue is that in our
 current
   replication protocol, it's possible for some, but not all messages in
 the
   request to be committed. This makes resend a bit harder to deal with
  since
   on receiving an OffsetMismatch error, it's not clear which messages
 have
   been committed. One possibility is to expect that compression is
 enabled,
   in which case multiple messages are compressed into a single message.
 I
  was
   thinking that another possibility is for the broker to return the
 current
   high watermark when sending an OffsetMismatch error. Based on this
 info,
   the producer can resend the subset of messages that have not been
   committed. However, this may not work in a compacted topic since there
  can
   be holes in the offset.
 
  This is a excellent question. It's my understanding that at least a
  *prefix* of messages will be committed (right?) -- which seems to be
  enough for many cases. I'll try and come up with a more concrete
  answer here.
 
   2. Is this feature only intended to be used with ack = all? The client
   doesn't get the offset with ack = 0. With ack = 1, it's possible for a
   previously acked message to be lost during leader transition, which
 will
   make the client logic more complicated.
 
  It's true that acks = 0 doesn't seem to be particularly useful; in all
  the cases I've come across, the client eventually wants to know about
  the mismatch error. However, it seems like there are some cases where
  acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
  losing messages during a leader transition just means you need to
  rewind / restart the load, which is not especially catastrophic. For
  many other interesting cases, acks = all is probably preferable.
 
   3. How does the producer client know 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-22 Thread Jiangjie Qin
I missed yesterday's KIP hangout. I'm currently working on another KIP for
enriched metadata of messages. Guozhang has already created a wiki page
before (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
We plan to fill the relative offset to the offset field in the batch sent
by producer to avoid broker side re-compression. The message offset would
become batch base offset + relative offset. I guess maybe the expected
offset in KIP-27 can be only set for base offset? Would that affect certain
use cases?

For Jun's comments, I am not sure I completely get it. I think the producer
only sends one batch per partition in a request. So either that batch is
appended or not. Why a batch would be partially committed?

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin b...@kirw.in wrote:

 That's a fair point. I've added some imagined job logic to the KIP, so
 we can make sure the proposal stays in sync with the usages we're
 discussing. (The logic is just a quick sketch for now -- I expect I'll
 need to elaborate it as we get into more detail, or to address other
 concerns...)

 On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao j...@confluent.io wrote:
  For 1, yes, when there is a transient leader change, it's guaranteed
 that a
  prefix of the messages in a request will be committed. However, it seems
  that the client needs to know what subset of messages are committed in
  order to resume the sending. Then the question is how.
 
  As Flavio indicated, for the use cases that you listed, it would be
 useful
  to figure out the exact logic by using this feature. For example, in the
  partition K/V store example, when we fail over to a new writer to the
  commit log, the zombie writer can publish new messages to the log after
 the
  new writer takes over, but before it publishes any message. We probably
  need to outline how this case can be handled properly.
 
  Thanks,
 
  Jun
 
  On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:
 
  Hi Jun,
 
  Thanks for the close reading! Responses inline.
 
   Thanks for the write-up. The single producer use case you mentioned
 makes
   sense. It would be useful to include that in the KIP wiki.
 
  Great -- I'll make sure that the wiki is clear about this.
 
   1. What happens when the leader of the partition changes in the middle
  of a
   produce request? In this case, the producer client is not sure whether
  the
   request succeeds or not. If there is only a single message in the
  request,
   the producer can just resend the request. If it sees an OffsetMismatch
   error, it knows that the previous send actually succeeded and can
 proceed
   with the next write. This is nice since it not only allows the
 producer
  to
   proceed during transient failures in the broker, it also avoids
  duplicates
   during producer resend. One caveat is when there are multiple
 messages in
   the same partition in a produce request. The issue is that in our
 current
   replication protocol, it's possible for some, but not all messages in
 the
   request to be committed. This makes resend a bit harder to deal with
  since
   on receiving an OffsetMismatch error, it's not clear which messages
 have
   been committed. One possibility is to expect that compression is
 enabled,
   in which case multiple messages are compressed into a single message.
 I
  was
   thinking that another possibility is for the broker to return the
 current
   high watermark when sending an OffsetMismatch error. Based on this
 info,
   the producer can resend the subset of messages that have not been
   committed. However, this may not work in a compacted topic since there
  can
   be holes in the offset.
 
  This is a excellent question. It's my understanding that at least a
  *prefix* of messages will be committed (right?) -- which seems to be
  enough for many cases. I'll try and come up with a more concrete
  answer here.
 
   2. Is this feature only intended to be used with ack = all? The client
   doesn't get the offset with ack = 0. With ack = 1, it's possible for a
   previously acked message to be lost during leader transition, which
 will
   make the client logic more complicated.
 
  It's true that acks = 0 doesn't seem to be particularly useful; in all
  the cases I've come across, the client eventually wants to know about
  the mismatch error. However, it seems like there are some cases where
  acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
  losing messages during a leader transition just means you need to
  rewind / restart the load, which is not especially catastrophic. For
  many other interesting cases, acks = all is probably preferable.
 
   3. How does the producer client know the offset to send the first
  message?
   Do we need to expose an API in producer to get the current high
  watermark?
 
  You're right, it might be irritating to have to go through the
  consumer API just for this. There are some 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-21 Thread Jun Rao
For 1, yes, when there is a transient leader change, it's guaranteed that a
prefix of the messages in a request will be committed. However, it seems
that the client needs to know what subset of messages are committed in
order to resume the sending. Then the question is how.

As Flavio indicated, for the use cases that you listed, it would be useful
to figure out the exact logic by using this feature. For example, in the
partition K/V store example, when we fail over to a new writer to the
commit log, the zombie writer can publish new messages to the log after the
new writer takes over, but before it publishes any message. We probably
need to outline how this case can be handled properly.

Thanks,

Jun

On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:

 Hi Jun,

 Thanks for the close reading! Responses inline.

  Thanks for the write-up. The single producer use case you mentioned makes
  sense. It would be useful to include that in the KIP wiki.

 Great -- I'll make sure that the wiki is clear about this.

  1. What happens when the leader of the partition changes in the middle
 of a
  produce request? In this case, the producer client is not sure whether
 the
  request succeeds or not. If there is only a single message in the
 request,
  the producer can just resend the request. If it sees an OffsetMismatch
  error, it knows that the previous send actually succeeded and can proceed
  with the next write. This is nice since it not only allows the producer
 to
  proceed during transient failures in the broker, it also avoids
 duplicates
  during producer resend. One caveat is when there are multiple messages in
  the same partition in a produce request. The issue is that in our current
  replication protocol, it's possible for some, but not all messages in the
  request to be committed. This makes resend a bit harder to deal with
 since
  on receiving an OffsetMismatch error, it's not clear which messages have
  been committed. One possibility is to expect that compression is enabled,
  in which case multiple messages are compressed into a single message. I
 was
  thinking that another possibility is for the broker to return the current
  high watermark when sending an OffsetMismatch error. Based on this info,
  the producer can resend the subset of messages that have not been
  committed. However, this may not work in a compacted topic since there
 can
  be holes in the offset.

 This is a excellent question. It's my understanding that at least a
 *prefix* of messages will be committed (right?) -- which seems to be
 enough for many cases. I'll try and come up with a more concrete
 answer here.

  2. Is this feature only intended to be used with ack = all? The client
  doesn't get the offset with ack = 0. With ack = 1, it's possible for a
  previously acked message to be lost during leader transition, which will
  make the client logic more complicated.

 It's true that acks = 0 doesn't seem to be particularly useful; in all
 the cases I've come across, the client eventually wants to know about
 the mismatch error. However, it seems like there are some cases where
 acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
 losing messages during a leader transition just means you need to
 rewind / restart the load, which is not especially catastrophic. For
 many other interesting cases, acks = all is probably preferable.

  3. How does the producer client know the offset to send the first
 message?
  Do we need to expose an API in producer to get the current high
 watermark?

 You're right, it might be irritating to have to go through the
 consumer API just for this. There are some cases where the offsets are
 already available -- like the commit-log-for-KV-store example -- but
 in general, being able to get the offsets from the producer interface
 does sound convenient.

  We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
 you
  can describe this KIP a bit then?

 Sure, happy to join.

  Thanks,
 
  Jun
 
 
 
  On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:
 
  Just wanted to flag a little discussion that happened on the ticket:
 
 
 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
 
  In particular, Yasuhiro Matsuda proposed an interesting variant on
  this that performs the offset check on the message key (instead of
  just the partition), with bounded space requirements, at the cost of
  potentially some spurious failures. (ie. the produce request may fail
  even if that particular key hasn't been updated recently.) This
  addresses a couple of the drawbacks of the per-key approach mentioned
  at the bottom of the KIP.
 
  On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
   Hi all,
  
   So, perhaps it's worth adding a couple specific examples of where this
   feature is useful, to make this a bit more concrete:
  
   

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-21 Thread Yasuhiro Matsuda
In KV store usage, all instances are writers, aren't they? There is no
leader or master, thus there is no fail over. The offset based CAS ensures
an update is based on the latest value and doesn't care who is writing the
new value.

I think the idea of the offset based CAS is great. I think it works very
well with Event Sourcing. It may be a bit weak for ensuring the single
writer though.


On Tue, Jul 21, 2015 at 8:45 AM, Jun Rao j...@confluent.io wrote:

 For 1, yes, when there is a transient leader change, it's guaranteed that a
 prefix of the messages in a request will be committed. However, it seems
 that the client needs to know what subset of messages are committed in
 order to resume the sending. Then the question is how.

 As Flavio indicated, for the use cases that you listed, it would be useful
 to figure out the exact logic by using this feature. For example, in the
 partition K/V store example, when we fail over to a new writer to the
 commit log, the zombie writer can publish new messages to the log after the
 new writer takes over, but before it publishes any message. We probably
 need to outline how this case can be handled properly.

 Thanks,

 Jun

 On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:

  Hi Jun,
 
  Thanks for the close reading! Responses inline.
 
   Thanks for the write-up. The single producer use case you mentioned
 makes
   sense. It would be useful to include that in the KIP wiki.
 
  Great -- I'll make sure that the wiki is clear about this.
 
   1. What happens when the leader of the partition changes in the middle
  of a
   produce request? In this case, the producer client is not sure whether
  the
   request succeeds or not. If there is only a single message in the
  request,
   the producer can just resend the request. If it sees an OffsetMismatch
   error, it knows that the previous send actually succeeded and can
 proceed
   with the next write. This is nice since it not only allows the producer
  to
   proceed during transient failures in the broker, it also avoids
  duplicates
   during producer resend. One caveat is when there are multiple messages
 in
   the same partition in a produce request. The issue is that in our
 current
   replication protocol, it's possible for some, but not all messages in
 the
   request to be committed. This makes resend a bit harder to deal with
  since
   on receiving an OffsetMismatch error, it's not clear which messages
 have
   been committed. One possibility is to expect that compression is
 enabled,
   in which case multiple messages are compressed into a single message. I
  was
   thinking that another possibility is for the broker to return the
 current
   high watermark when sending an OffsetMismatch error. Based on this
 info,
   the producer can resend the subset of messages that have not been
   committed. However, this may not work in a compacted topic since there
  can
   be holes in the offset.
 
  This is a excellent question. It's my understanding that at least a
  *prefix* of messages will be committed (right?) -- which seems to be
  enough for many cases. I'll try and come up with a more concrete
  answer here.
 
   2. Is this feature only intended to be used with ack = all? The client
   doesn't get the offset with ack = 0. With ack = 1, it's possible for a
   previously acked message to be lost during leader transition, which
 will
   make the client logic more complicated.
 
  It's true that acks = 0 doesn't seem to be particularly useful; in all
  the cases I've come across, the client eventually wants to know about
  the mismatch error. However, it seems like there are some cases where
  acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
  losing messages during a leader transition just means you need to
  rewind / restart the load, which is not especially catastrophic. For
  many other interesting cases, acks = all is probably preferable.
 
   3. How does the producer client know the offset to send the first
  message?
   Do we need to expose an API in producer to get the current high
  watermark?
 
  You're right, it might be irritating to have to go through the
  consumer API just for this. There are some cases where the offsets are
  already available -- like the commit-log-for-KV-store example -- but
  in general, being able to get the offsets from the producer interface
  does sound convenient.
 
   We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
  you
   can describe this KIP a bit then?
 
  Sure, happy to join.
 
   Thanks,
  
   Jun
  
  
  
   On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:
  
   Just wanted to flag a little discussion that happened on the ticket:
  
  
 
 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
  
   In particular, Yasuhiro Matsuda proposed an interesting variant on
   this that performs 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Ben Kirwin
Hi Jun,

Thanks for the close reading! Responses inline.

 Thanks for the write-up. The single producer use case you mentioned makes
 sense. It would be useful to include that in the KIP wiki.

Great -- I'll make sure that the wiki is clear about this.

 1. What happens when the leader of the partition changes in the middle of a
 produce request? In this case, the producer client is not sure whether the
 request succeeds or not. If there is only a single message in the request,
 the producer can just resend the request. If it sees an OffsetMismatch
 error, it knows that the previous send actually succeeded and can proceed
 with the next write. This is nice since it not only allows the producer to
 proceed during transient failures in the broker, it also avoids duplicates
 during producer resend. One caveat is when there are multiple messages in
 the same partition in a produce request. The issue is that in our current
 replication protocol, it's possible for some, but not all messages in the
 request to be committed. This makes resend a bit harder to deal with since
 on receiving an OffsetMismatch error, it's not clear which messages have
 been committed. One possibility is to expect that compression is enabled,
 in which case multiple messages are compressed into a single message. I was
 thinking that another possibility is for the broker to return the current
 high watermark when sending an OffsetMismatch error. Based on this info,
 the producer can resend the subset of messages that have not been
 committed. However, this may not work in a compacted topic since there can
 be holes in the offset.

This is a excellent question. It's my understanding that at least a
*prefix* of messages will be committed (right?) -- which seems to be
enough for many cases. I'll try and come up with a more concrete
answer here.

 2. Is this feature only intended to be used with ack = all? The client
 doesn't get the offset with ack = 0. With ack = 1, it's possible for a
 previously acked message to be lost during leader transition, which will
 make the client logic more complicated.

It's true that acks = 0 doesn't seem to be particularly useful; in all
the cases I've come across, the client eventually wants to know about
the mismatch error. However, it seems like there are some cases where
acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
losing messages during a leader transition just means you need to
rewind / restart the load, which is not especially catastrophic. For
many other interesting cases, acks = all is probably preferable.

 3. How does the producer client know the offset to send the first message?
 Do we need to expose an API in producer to get the current high watermark?

You're right, it might be irritating to have to go through the
consumer API just for this. There are some cases where the offsets are
already available -- like the commit-log-for-KV-store example -- but
in general, being able to get the offsets from the producer interface
does sound convenient.

 We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you
 can describe this KIP a bit then?

Sure, happy to join.

 Thanks,

 Jun



 On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:

 Just wanted to flag a little discussion that happened on the ticket:

 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259

 In particular, Yasuhiro Matsuda proposed an interesting variant on
 this that performs the offset check on the message key (instead of
 just the partition), with bounded space requirements, at the cost of
 potentially some spurious failures. (ie. the produce request may fail
 even if that particular key hasn't been updated recently.) This
 addresses a couple of the drawbacks of the per-key approach mentioned
 at the bottom of the KIP.

 On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
  Hi all,
 
  So, perhaps it's worth adding a couple specific examples of where this
  feature is useful, to make this a bit more concrete:
 
  - Suppose I'm using Kafka as a commit log for a partitioned KV store,
  like Samza or Pistachio (?) do. We bootstrap the process state by
  reading from that partition, and log all state updates to that
  partition when we're running. Now imagine that one of my processes
  locks up -- GC or similar -- and the system transitions that partition
  over to another node. When the GC is finished, the old 'owner' of that
  partition might still be trying to write to the commit log at the same
  as the new one is. A process might detect this by noticing that the
  offset of the published message is bigger than it thought the upcoming
  offset was, which implies someone else has been writing to the log...
  but by then it's too late, and the commit log is already corrupt. With
  a 'conditional produce', one of those processes will have it's publish
  request 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Jun Rao
The per-key generalization is useful. As Jiangjie mentioned in KAFKA-2260,
one thing that we need to sort out is what happens if a produce request has
messages with different keys and some of the messages have expected offsets
while some others don't. Currently, the produce response has an error code
per partition, not per message. One way is to just define the semantics as:
the produce request will only go through if all keys in the request pass
the offset test.

Thanks,

Jun

On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:

 Just wanted to flag a little discussion that happened on the ticket:

 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259

 In particular, Yasuhiro Matsuda proposed an interesting variant on
 this that performs the offset check on the message key (instead of
 just the partition), with bounded space requirements, at the cost of
 potentially some spurious failures. (ie. the produce request may fail
 even if that particular key hasn't been updated recently.) This
 addresses a couple of the drawbacks of the per-key approach mentioned
 at the bottom of the KIP.

 On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
  Hi all,
 
  So, perhaps it's worth adding a couple specific examples of where this
  feature is useful, to make this a bit more concrete:
 
  - Suppose I'm using Kafka as a commit log for a partitioned KV store,
  like Samza or Pistachio (?) do. We bootstrap the process state by
  reading from that partition, and log all state updates to that
  partition when we're running. Now imagine that one of my processes
  locks up -- GC or similar -- and the system transitions that partition
  over to another node. When the GC is finished, the old 'owner' of that
  partition might still be trying to write to the commit log at the same
  as the new one is. A process might detect this by noticing that the
  offset of the published message is bigger than it thought the upcoming
  offset was, which implies someone else has been writing to the log...
  but by then it's too late, and the commit log is already corrupt. With
  a 'conditional produce', one of those processes will have it's publish
  request refused -- so we've avoided corrupting the state.
 
  - Envision some copycat-like system, where we have some sharded
  postgres setup and we're tailing each shard into its own partition.
  Normally, it's fairly easy to avoid duplicates here: we can track
  which offset in the WAL corresponds to which offset in Kafka, and we
  know how many messages we've written to Kafka already, so the state is
  very simple. However, it is possible that for a moment -- due to
  rebalancing or operator error or some other thing -- two different
  nodes are tailing the same postgres shard at once! Normally this would
  introduce duplicate messages, but by specifying the expected offset,
  we can avoid this.
 
  So perhaps it's better to say that this is useful when a single
  producer is *expected*, but multiple producers are *possible*? (In the
  same way that the high-level consumer normally has 1 consumer in a
  group reading from a partition, but there are small windows where more
  than one might be reading at the same time.) This is also the spirit
  of the 'runtime cost' comment -- in the common case, where there is
  little to no contention, there's no performance overhead either. I
  mentioned this a little in the Motivation section -- maybe I should
  flesh that out a little bit?
 
  For me, the motivation to work this up was that I kept running into
  cases, like the above, where the existing API was almost-but-not-quite
  enough to give the guarantees I was looking for -- and the extension
  needed to handle those cases too was pretty small and natural-feeling.
 
  On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com
 wrote:
  Good concept. I have a question though.
 
  Say there are two producers A and B. Both producers are producing to
 same
  partition.
  - A sends a message with expected offset, x1
  - Broker accepts is and sends an Ack
  - B sends a message with expected offset, x1
  - Broker rejects it, sends nack
  - B sends message again with expected offset, x1+1
  - Broker accepts it and sends Ack
  I guess this is what this KIP suggests, right? If yes, then how does
 this
  ensure that same message will not be written twice when two producers
 are
  producing to same partition? Producer on receiving a nack will try again
  with next offset and will keep doing so till the message is accepted.
 Am I
  missing something?
 
  Also, you have mentioned on KIP, it imposes little to no runtime cost
 in
  memory or time, I think that is not true for time. With this approach
  producers' performance will reduce proportionally to number of producers
  writing to same partition. Please correct me if I am missing out
 something.
 
 
  On Fri, Jul 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Jun Rao
Hi, Ben,

Thanks for the write-up. The single producer use case you mentioned makes
sense. It would be useful to include that in the KIP wiki. A couple
questions on the design details.

1. What happens when the leader of the partition changes in the middle of a
produce request? In this case, the producer client is not sure whether the
request succeeds or not. If there is only a single message in the request,
the producer can just resend the request. If it sees an OffsetMismatch
error, it knows that the previous send actually succeeded and can proceed
with the next write. This is nice since it not only allows the producer to
proceed during transient failures in the broker, it also avoids duplicates
during producer resend. One caveat is when there are multiple messages in
the same partition in a produce request. The issue is that in our current
replication protocol, it's possible for some, but not all messages in the
request to be committed. This makes resend a bit harder to deal with since
on receiving an OffsetMismatch error, it's not clear which messages have
been committed. One possibility is to expect that compression is enabled,
in which case multiple messages are compressed into a single message. I was
thinking that another possibility is for the broker to return the current
high watermark when sending an OffsetMismatch error. Based on this info,
the producer can resend the subset of messages that have not been
committed. However, this may not work in a compacted topic since there can
be holes in the offset.

2. Is this feature only intended to be used with ack = all? The client
doesn't get the offset with ack = 0. With ack = 1, it's possible for a
previously acked message to be lost during leader transition, which will
make the client logic more complicated.

3. How does the producer client know the offset to send the first message?
Do we need to expose an API in producer to get the current high watermark?

We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps you
can describe this KIP a bit then?

Thanks,

Jun



On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:

 Just wanted to flag a little discussion that happened on the ticket:

 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259

 In particular, Yasuhiro Matsuda proposed an interesting variant on
 this that performs the offset check on the message key (instead of
 just the partition), with bounded space requirements, at the cost of
 potentially some spurious failures. (ie. the produce request may fail
 even if that particular key hasn't been updated recently.) This
 addresses a couple of the drawbacks of the per-key approach mentioned
 at the bottom of the KIP.

 On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
  Hi all,
 
  So, perhaps it's worth adding a couple specific examples of where this
  feature is useful, to make this a bit more concrete:
 
  - Suppose I'm using Kafka as a commit log for a partitioned KV store,
  like Samza or Pistachio (?) do. We bootstrap the process state by
  reading from that partition, and log all state updates to that
  partition when we're running. Now imagine that one of my processes
  locks up -- GC or similar -- and the system transitions that partition
  over to another node. When the GC is finished, the old 'owner' of that
  partition might still be trying to write to the commit log at the same
  as the new one is. A process might detect this by noticing that the
  offset of the published message is bigger than it thought the upcoming
  offset was, which implies someone else has been writing to the log...
  but by then it's too late, and the commit log is already corrupt. With
  a 'conditional produce', one of those processes will have it's publish
  request refused -- so we've avoided corrupting the state.
 
  - Envision some copycat-like system, where we have some sharded
  postgres setup and we're tailing each shard into its own partition.
  Normally, it's fairly easy to avoid duplicates here: we can track
  which offset in the WAL corresponds to which offset in Kafka, and we
  know how many messages we've written to Kafka already, so the state is
  very simple. However, it is possible that for a moment -- due to
  rebalancing or operator error or some other thing -- two different
  nodes are tailing the same postgres shard at once! Normally this would
  introduce duplicate messages, but by specifying the expected offset,
  we can avoid this.
 
  So perhaps it's better to say that this is useful when a single
  producer is *expected*, but multiple producers are *possible*? (In the
  same way that the high-level consumer normally has 1 consumer in a
  group reading from a partition, but there are small windows where more
  than one might be reading at the same time.) This is also the spirit
  of the 'runtime cost' comment -- in the common case, 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Flavio P JUNQUEIRA
I'm with you on the races that could happen in the scenarios you describe,
but I'm still not convinced that conditionally updating is the best call.
Instead of conditionally updating, the broker could fence off the old owner
to avoid spurious writes, and that's valid for all attempts. The advantage
of fencing is that the broker does not accept at all requests from others,
while the conditional update is a bit fragile to protect streams of
publishes.

-Flavio

On Fri, Jul 17, 2015 at 11:47 PM, Ben Kirwin b...@kirw.in wrote:

 Hi all,

 So, perhaps it's worth adding a couple specific examples of where this
 feature is useful, to make this a bit more concrete:

 - Suppose I'm using Kafka as a commit log for a partitioned KV store,
 like Samza or Pistachio (?) do. We bootstrap the process state by
 reading from that partition, and log all state updates to that
 partition when we're running. Now imagine that one of my processes
 locks up -- GC or similar -- and the system transitions that partition
 over to another node. When the GC is finished, the old 'owner' of that
 partition might still be trying to write to the commit log at the same
 as the new one is. A process might detect this by noticing that the
 offset of the published message is bigger than it thought the upcoming
 offset was, which implies someone else has been writing to the log...
 but by then it's too late, and the commit log is already corrupt. With
 a 'conditional produce', one of those processes will have it's publish
 request refused -- so we've avoided corrupting the state.

 - Envision some copycat-like system, where we have some sharded
 postgres setup and we're tailing each shard into its own partition.
 Normally, it's fairly easy to avoid duplicates here: we can track
 which offset in the WAL corresponds to which offset in Kafka, and we
 know how many messages we've written to Kafka already, so the state is
 very simple. However, it is possible that for a moment -- due to
 rebalancing or operator error or some other thing -- two different
 nodes are tailing the same postgres shard at once! Normally this would
 introduce duplicate messages, but by specifying the expected offset,
 we can avoid this.

 So perhaps it's better to say that this is useful when a single
 producer is *expected*, but multiple producers are *possible*? (In the
 same way that the high-level consumer normally has 1 consumer in a
 group reading from a partition, but there are small windows where more
 than one might be reading at the same time.) This is also the spirit
 of the 'runtime cost' comment -- in the common case, where there is
 little to no contention, there's no performance overhead either. I
 mentioned this a little in the Motivation section -- maybe I should
 flesh that out a little bit?

 For me, the motivation to work this up was that I kept running into
 cases, like the above, where the existing API was almost-but-not-quite
 enough to give the guarantees I was looking for -- and the extension
 needed to handle those cases too was pretty small and natural-feeling.

 On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote:
  Good concept. I have a question though.
 
  Say there are two producers A and B. Both producers are producing to same
  partition.
  - A sends a message with expected offset, x1
  - Broker accepts is and sends an Ack
  - B sends a message with expected offset, x1
  - Broker rejects it, sends nack
  - B sends message again with expected offset, x1+1
  - Broker accepts it and sends Ack
  I guess this is what this KIP suggests, right? If yes, then how does this
  ensure that same message will not be written twice when two producers are
  producing to same partition? Producer on receiving a nack will try again
  with next offset and will keep doing so till the message is accepted. Am
 I
  missing something?
 
  Also, you have mentioned on KIP, it imposes little to no runtime cost in
  memory or time, I think that is not true for time. With this approach
  producers' performance will reduce proportionally to number of producers
  writing to same partition. Please correct me if I am missing out
 something.
 
 
  On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
  gharatmayures...@gmail.com wrote:
 
  If we have 2 producers producing to a partition, they can be out of
 order,
  then how does one producer know what offset to expect as it does not
  interact with other producer?
 
  Can you give an example flow that explains how it works with single
  producer and with multiple producers?
 
 
  Thanks,
 
  Mayuresh
 
  On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
  fpjunque...@yahoo.com.invalid wrote:
 
   I like this feature, it reminds me of conditional updates in
 zookeeper.
   I'm not sure if it'd be best to have some mechanism for fencing rather
  than
   a conditional write like you're proposing. The reason I'm saying this
 is
   that the conditional write applies to requests individually, while it
   sounds 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Jay Kreps
It would be worth fleshing out the use cases a bit more and thinking
through the overlap with the other proposals for transactions and
idempotence (since likely we will end up with both).

The advantage of this proposal is that it is really simple.

If we go through use cases:
1. Stream processing: I suspect in this case data is partitioned over
multiple partitions/topics by multiple writers so it needs a more general
atomicity across partitions.
2. Copycat: This is the case where you're publishing data from an external
system. For some external systems I think this mechanism could provide an
exactly-once publication mechanism however there are some details about
retries to think through.
3. Key-value store/event sourcing: This is the case where you are building
a log-centric key-value store or an event sourced application. I think this
could potentially use this feature but it needs thinking through.

One subtlety to think through is the relationship with request pipelining
and retries.

-Jay

On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin b...@kirw.in wrote:

 Hi Jun,

 Thanks for the close reading! Responses inline.

  Thanks for the write-up. The single producer use case you mentioned makes
  sense. It would be useful to include that in the KIP wiki.

 Great -- I'll make sure that the wiki is clear about this.

  1. What happens when the leader of the partition changes in the middle
 of a
  produce request? In this case, the producer client is not sure whether
 the
  request succeeds or not. If there is only a single message in the
 request,
  the producer can just resend the request. If it sees an OffsetMismatch
  error, it knows that the previous send actually succeeded and can proceed
  with the next write. This is nice since it not only allows the producer
 to
  proceed during transient failures in the broker, it also avoids
 duplicates
  during producer resend. One caveat is when there are multiple messages in
  the same partition in a produce request. The issue is that in our current
  replication protocol, it's possible for some, but not all messages in the
  request to be committed. This makes resend a bit harder to deal with
 since
  on receiving an OffsetMismatch error, it's not clear which messages have
  been committed. One possibility is to expect that compression is enabled,
  in which case multiple messages are compressed into a single message. I
 was
  thinking that another possibility is for the broker to return the current
  high watermark when sending an OffsetMismatch error. Based on this info,
  the producer can resend the subset of messages that have not been
  committed. However, this may not work in a compacted topic since there
 can
  be holes in the offset.

 This is a excellent question. It's my understanding that at least a
 *prefix* of messages will be committed (right?) -- which seems to be
 enough for many cases. I'll try and come up with a more concrete
 answer here.

  2. Is this feature only intended to be used with ack = all? The client
  doesn't get the offset with ack = 0. With ack = 1, it's possible for a
  previously acked message to be lost during leader transition, which will
  make the client logic more complicated.

 It's true that acks = 0 doesn't seem to be particularly useful; in all
 the cases I've come across, the client eventually wants to know about
 the mismatch error. However, it seems like there are some cases where
 acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
 losing messages during a leader transition just means you need to
 rewind / restart the load, which is not especially catastrophic. For
 many other interesting cases, acks = all is probably preferable.

  3. How does the producer client know the offset to send the first
 message?
  Do we need to expose an API in producer to get the current high
 watermark?

 You're right, it might be irritating to have to go through the
 consumer API just for this. There are some cases where the offsets are
 already available -- like the commit-log-for-KV-store example -- but
 in general, being able to get the offsets from the producer interface
 does sound convenient.

  We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps
 you
  can describe this KIP a bit then?

 Sure, happy to join.

  Thanks,
 
  Jun
 
 
 
  On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin b...@kirw.in wrote:
 
  Just wanted to flag a little discussion that happened on the ticket:
 
 
 https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
 
  In particular, Yasuhiro Matsuda proposed an interesting variant on
  this that performs the offset check on the message key (instead of
  just the partition), with bounded space requirements, at the cost of
  potentially some spurious failures. (ie. the produce request may fail
  even if that particular key hasn't been updated recently.) This
  addresses a 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Flavio P JUNQUEIRA
I'm with you on the races that could happen in the scenarios you describe,
but I'm still not convinced that conditionally updating is the best call.
Instead of conditionally updating, the broker could fence off the old owner
to avoid spurious writes, and that's valid for all attempts. The advantage
of fencing is that the broker does not accept at all requests from others,
while the conditional update is a bit fragile to protect streams of
publishes.

-Flavio

On Fri, Jul 17, 2015 at 11:47 PM, Ben Kirwin b...@kirw.in wrote:

 Hi all,

 So, perhaps it's worth adding a couple specific examples of where this
 feature is useful, to make this a bit more concrete:

 - Suppose I'm using Kafka as a commit log for a partitioned KV store,
 like Samza or Pistachio (?) do. We bootstrap the process state by
 reading from that partition, and log all state updates to that
 partition when we're running. Now imagine that one of my processes
 locks up -- GC or similar -- and the system transitions that partition
 over to another node. When the GC is finished, the old 'owner' of that
 partition might still be trying to write to the commit log at the same
 as the new one is. A process might detect this by noticing that the
 offset of the published message is bigger than it thought the upcoming
 offset was, which implies someone else has been writing to the log...
 but by then it's too late, and the commit log is already corrupt. With
 a 'conditional produce', one of those processes will have it's publish
 request refused -- so we've avoided corrupting the state.

 - Envision some copycat-like system, where we have some sharded
 postgres setup and we're tailing each shard into its own partition.
 Normally, it's fairly easy to avoid duplicates here: we can track
 which offset in the WAL corresponds to which offset in Kafka, and we
 know how many messages we've written to Kafka already, so the state is
 very simple. However, it is possible that for a moment -- due to
 rebalancing or operator error or some other thing -- two different
 nodes are tailing the same postgres shard at once! Normally this would
 introduce duplicate messages, but by specifying the expected offset,
 we can avoid this.

 So perhaps it's better to say that this is useful when a single
 producer is *expected*, but multiple producers are *possible*? (In the
 same way that the high-level consumer normally has 1 consumer in a
 group reading from a partition, but there are small windows where more
 than one might be reading at the same time.) This is also the spirit
 of the 'runtime cost' comment -- in the common case, where there is
 little to no contention, there's no performance overhead either. I
 mentioned this a little in the Motivation section -- maybe I should
 flesh that out a little bit?

 For me, the motivation to work this up was that I kept running into
 cases, like the above, where the existing API was almost-but-not-quite
 enough to give the guarantees I was looking for -- and the extension
 needed to handle those cases too was pretty small and natural-feeling.

 On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote:
  Good concept. I have a question though.
 
  Say there are two producers A and B. Both producers are producing to same
  partition.
  - A sends a message with expected offset, x1
  - Broker accepts is and sends an Ack
  - B sends a message with expected offset, x1
  - Broker rejects it, sends nack
  - B sends message again with expected offset, x1+1
  - Broker accepts it and sends Ack
  I guess this is what this KIP suggests, right? If yes, then how does this
  ensure that same message will not be written twice when two producers are
  producing to same partition? Producer on receiving a nack will try again
  with next offset and will keep doing so till the message is accepted. Am
 I
  missing something?
 
  Also, you have mentioned on KIP, it imposes little to no runtime cost in
  memory or time, I think that is not true for time. With this approach
  producers' performance will reduce proportionally to number of producers
  writing to same partition. Please correct me if I am missing out
 something.
 
 
  On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
  gharatmayures...@gmail.com wrote:
 
  If we have 2 producers producing to a partition, they can be out of
 order,
  then how does one producer know what offset to expect as it does not
  interact with other producer?
 
  Can you give an example flow that explains how it works with single
  producer and with multiple producers?
 
 
  Thanks,
 
  Mayuresh
 
  On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
  fpjunque...@yahoo.com.invalid wrote:
 
   I like this feature, it reminds me of conditional updates in
 zookeeper.
   I'm not sure if it'd be best to have some mechanism for fencing rather
  than
   a conditional write like you're proposing. The reason I'm saying this
 is
   that the conditional write applies to requests individually, while it
   sounds 

Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Flavio P JUNQUEIRA
Up to Ben to clarify, but I'd think that in this case, it is up to the
logic of B to decide what to do. B knows that the offset isn't what it
expects, so it can react accordingly. If it chooses to try again, then it
should not violate any application invariant.

-Flavio

On Fri, Jul 17, 2015 at 9:49 PM, Ashish Singh asi...@cloudera.com wrote:

 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

  If we have 2 producers producing to a partition, they can be out of
 order,
  then how does one producer know what offset to expect as it does not
  interact with other producer?
 
  Can you give an example flow that explains how it works with single
  producer and with multiple producers?
 
 
  Thanks,
 
  Mayuresh
 
  On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
  fpjunque...@yahoo.com.invalid wrote:
 
   I like this feature, it reminds me of conditional updates in zookeeper.
   I'm not sure if it'd be best to have some mechanism for fencing rather
  than
   a conditional write like you're proposing. The reason I'm saying this
 is
   that the conditional write applies to requests individually, while it
   sounds like you want to make sure that there is a single client writing
  so
   over multiple requests.
  
   -Flavio
  
On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
   
Hi there,
   
I just added a KIP for a 'conditional publish' operation: a simple
CAS-like mechanism for the Kafka producer. The wiki page is here:
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
   
And there's some previous discussion on the ticket and the users
 list:
   
https://issues.apache.org/jira/browse/KAFKA-2260
   
   
  
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
   
As always, comments and suggestions are very welcome.
   
Thanks,
Ben
  
  
 
 
  --
  -Regards,
  Mayuresh R. Gharat
  (862) 250-7125
 



 --

 Regards,
 Ashish



Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-20 Thread Ben Kirwin
Yes, sorry, I think this is right -- it's pretty application-specific.
One thing to note: in a large subset of cases (ie. bulk load,
copycat-type, mirrormaker) the correct response is not to resend the
message at all; if there's already a message at that offset, it's
because another instance of the same process already sent the exact
same message.

On Mon, Jul 20, 2015 at 4:38 PM, Flavio P JUNQUEIRA f...@apache.org wrote:
 Up to Ben to clarify, but I'd think that in this case, it is up to the
 logic of B to decide what to do. B knows that the offset isn't what it
 expects, so it can react accordingly. If it chooses to try again, then it
 should not violate any application invariant.

 -Flavio

 On Fri, Jul 17, 2015 at 9:49 PM, Ashish Singh asi...@cloudera.com wrote:

 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

  If we have 2 producers producing to a partition, they can be out of
 order,
  then how does one producer know what offset to expect as it does not
  interact with other producer?
 
  Can you give an example flow that explains how it works with single
  producer and with multiple producers?
 
 
  Thanks,
 
  Mayuresh
 
  On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
  fpjunque...@yahoo.com.invalid wrote:
 
   I like this feature, it reminds me of conditional updates in zookeeper.
   I'm not sure if it'd be best to have some mechanism for fencing rather
  than
   a conditional write like you're proposing. The reason I'm saying this
 is
   that the conditional write applies to requests individually, while it
   sounds like you want to make sure that there is a single client writing
  so
   over multiple requests.
  
   -Flavio
  
On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
   
Hi there,
   
I just added a KIP for a 'conditional publish' operation: a simple
CAS-like mechanism for the Kafka producer. The wiki page is here:
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
   
And there's some previous discussion on the ticket and the users
 list:
   
https://issues.apache.org/jira/browse/KAFKA-2260
   
   
  
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
   
As always, comments and suggestions are very welcome.
   
Thanks,
Ben
  
  
 
 
  --
  -Regards,
  Mayuresh R. Gharat
  (862) 250-7125
 



 --

 Regards,
 Ashish



Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-18 Thread Ben Kirwin
Just wanted to flag a little discussion that happened on the ticket:
https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259

In particular, Yasuhiro Matsuda proposed an interesting variant on
this that performs the offset check on the message key (instead of
just the partition), with bounded space requirements, at the cost of
potentially some spurious failures. (ie. the produce request may fail
even if that particular key hasn't been updated recently.) This
addresses a couple of the drawbacks of the per-key approach mentioned
at the bottom of the KIP.

On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin b...@kirw.in wrote:
 Hi all,

 So, perhaps it's worth adding a couple specific examples of where this
 feature is useful, to make this a bit more concrete:

 - Suppose I'm using Kafka as a commit log for a partitioned KV store,
 like Samza or Pistachio (?) do. We bootstrap the process state by
 reading from that partition, and log all state updates to that
 partition when we're running. Now imagine that one of my processes
 locks up -- GC or similar -- and the system transitions that partition
 over to another node. When the GC is finished, the old 'owner' of that
 partition might still be trying to write to the commit log at the same
 as the new one is. A process might detect this by noticing that the
 offset of the published message is bigger than it thought the upcoming
 offset was, which implies someone else has been writing to the log...
 but by then it's too late, and the commit log is already corrupt. With
 a 'conditional produce', one of those processes will have it's publish
 request refused -- so we've avoided corrupting the state.

 - Envision some copycat-like system, where we have some sharded
 postgres setup and we're tailing each shard into its own partition.
 Normally, it's fairly easy to avoid duplicates here: we can track
 which offset in the WAL corresponds to which offset in Kafka, and we
 know how many messages we've written to Kafka already, so the state is
 very simple. However, it is possible that for a moment -- due to
 rebalancing or operator error or some other thing -- two different
 nodes are tailing the same postgres shard at once! Normally this would
 introduce duplicate messages, but by specifying the expected offset,
 we can avoid this.

 So perhaps it's better to say that this is useful when a single
 producer is *expected*, but multiple producers are *possible*? (In the
 same way that the high-level consumer normally has 1 consumer in a
 group reading from a partition, but there are small windows where more
 than one might be reading at the same time.) This is also the spirit
 of the 'runtime cost' comment -- in the common case, where there is
 little to no contention, there's no performance overhead either. I
 mentioned this a little in the Motivation section -- maybe I should
 flesh that out a little bit?

 For me, the motivation to work this up was that I kept running into
 cases, like the above, where the existing API was almost-but-not-quite
 enough to give the guarantees I was looking for -- and the extension
 needed to handle those cases too was pretty small and natural-feeling.

 On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote:
 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

 If we have 2 producers producing to a partition, they can be out of order,
 then how does one producer know what offset to expect as it does not
 interact with other producer?

 Can you give an example flow that explains how it works with single
 producer and with multiple producers?


 Thanks,

 Mayuresh

 On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
 fpjunque...@yahoo.com.invalid wrote:

  I like this feature, it reminds me of conditional updates in zookeeper.
  I'm not sure if it'd be best to have some mechanism for fencing rather
 than
  a 

[DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Ben Kirwin
Hi there,

I just added a KIP for a 'conditional publish' operation: a simple
CAS-like mechanism for the Kafka producer. The wiki page is here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

And there's some previous discussion on the ticket and the users list:

https://issues.apache.org/jira/browse/KAFKA-2260

https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E

As always, comments and suggestions are very welcome.

Thanks,
Ben


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Flavio Junqueira
I like this feature, it reminds me of conditional updates in zookeeper. I'm not 
sure if it'd be best to have some mechanism for fencing rather than a 
conditional write like you're proposing. The reason I'm saying this is that the 
conditional write applies to requests individually, while it sounds like you 
want to make sure that there is a single client writing so over multiple 
requests.

-Flavio

 On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
 
 Hi there,
 
 I just added a KIP for a 'conditional publish' operation: a simple
 CAS-like mechanism for the Kafka producer. The wiki page is here:
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
 
 And there's some previous discussion on the ticket and the users list:
 
 https://issues.apache.org/jira/browse/KAFKA-2260
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
 
 As always, comments and suggestions are very welcome.
 
 Thanks,
 Ben



Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Mayuresh Gharat
If we have 2 producers producing to a partition, they can be out of order,
then how does one producer know what offset to expect as it does not
interact with other producer?

Can you give an example flow that explains how it works with single
producer and with multiple producers?


Thanks,

Mayuresh

On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
fpjunque...@yahoo.com.invalid wrote:

 I like this feature, it reminds me of conditional updates in zookeeper.
 I'm not sure if it'd be best to have some mechanism for fencing rather than
 a conditional write like you're proposing. The reason I'm saying this is
 that the conditional write applies to requests individually, while it
 sounds like you want to make sure that there is a single client writing so
 over multiple requests.

 -Flavio

  On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
 
  Hi there,
 
  I just added a KIP for a 'conditional publish' operation: a simple
  CAS-like mechanism for the Kafka producer. The wiki page is here:
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
 
  And there's some previous discussion on the ticket and the users list:
 
  https://issues.apache.org/jira/browse/KAFKA-2260
 
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
 
  As always, comments and suggestions are very welcome.
 
  Thanks,
  Ben




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Ashish Singh
Good concept. I have a question though.

Say there are two producers A and B. Both producers are producing to same
partition.
- A sends a message with expected offset, x1
- Broker accepts is and sends an Ack
- B sends a message with expected offset, x1
- Broker rejects it, sends nack
- B sends message again with expected offset, x1+1
- Broker accepts it and sends Ack
I guess this is what this KIP suggests, right? If yes, then how does this
ensure that same message will not be written twice when two producers are
producing to same partition? Producer on receiving a nack will try again
with next offset and will keep doing so till the message is accepted. Am I
missing something?

Also, you have mentioned on KIP, it imposes little to no runtime cost in
memory or time, I think that is not true for time. With this approach
producers' performance will reduce proportionally to number of producers
writing to same partition. Please correct me if I am missing out something.


On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
gharatmayures...@gmail.com wrote:

 If we have 2 producers producing to a partition, they can be out of order,
 then how does one producer know what offset to expect as it does not
 interact with other producer?

 Can you give an example flow that explains how it works with single
 producer and with multiple producers?


 Thanks,

 Mayuresh

 On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
 fpjunque...@yahoo.com.invalid wrote:

  I like this feature, it reminds me of conditional updates in zookeeper.
  I'm not sure if it'd be best to have some mechanism for fencing rather
 than
  a conditional write like you're proposing. The reason I'm saying this is
  that the conditional write applies to requests individually, while it
  sounds like you want to make sure that there is a single client writing
 so
  over multiple requests.
 
  -Flavio
 
   On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
  
   Hi there,
  
   I just added a KIP for a 'conditional publish' operation: a simple
   CAS-like mechanism for the Kafka producer. The wiki page is here:
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
  
   And there's some previous discussion on the ticket and the users list:
  
   https://issues.apache.org/jira/browse/KAFKA-2260
  
  
 
 https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
  
   As always, comments and suggestions are very welcome.
  
   Thanks,
   Ben
 
 


 --
 -Regards,
 Mayuresh R. Gharat
 (862) 250-7125




-- 

Regards,
Ashish


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-17 Thread Ben Kirwin
Hi all,

So, perhaps it's worth adding a couple specific examples of where this
feature is useful, to make this a bit more concrete:

- Suppose I'm using Kafka as a commit log for a partitioned KV store,
like Samza or Pistachio (?) do. We bootstrap the process state by
reading from that partition, and log all state updates to that
partition when we're running. Now imagine that one of my processes
locks up -- GC or similar -- and the system transitions that partition
over to another node. When the GC is finished, the old 'owner' of that
partition might still be trying to write to the commit log at the same
as the new one is. A process might detect this by noticing that the
offset of the published message is bigger than it thought the upcoming
offset was, which implies someone else has been writing to the log...
but by then it's too late, and the commit log is already corrupt. With
a 'conditional produce', one of those processes will have it's publish
request refused -- so we've avoided corrupting the state.

- Envision some copycat-like system, where we have some sharded
postgres setup and we're tailing each shard into its own partition.
Normally, it's fairly easy to avoid duplicates here: we can track
which offset in the WAL corresponds to which offset in Kafka, and we
know how many messages we've written to Kafka already, so the state is
very simple. However, it is possible that for a moment -- due to
rebalancing or operator error or some other thing -- two different
nodes are tailing the same postgres shard at once! Normally this would
introduce duplicate messages, but by specifying the expected offset,
we can avoid this.

So perhaps it's better to say that this is useful when a single
producer is *expected*, but multiple producers are *possible*? (In the
same way that the high-level consumer normally has 1 consumer in a
group reading from a partition, but there are small windows where more
than one might be reading at the same time.) This is also the spirit
of the 'runtime cost' comment -- in the common case, where there is
little to no contention, there's no performance overhead either. I
mentioned this a little in the Motivation section -- maybe I should
flesh that out a little bit?

For me, the motivation to work this up was that I kept running into
cases, like the above, where the existing API was almost-but-not-quite
enough to give the guarantees I was looking for -- and the extension
needed to handle those cases too was pretty small and natural-feeling.

On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh asi...@cloudera.com wrote:
 Good concept. I have a question though.

 Say there are two producers A and B. Both producers are producing to same
 partition.
 - A sends a message with expected offset, x1
 - Broker accepts is and sends an Ack
 - B sends a message with expected offset, x1
 - Broker rejects it, sends nack
 - B sends message again with expected offset, x1+1
 - Broker accepts it and sends Ack
 I guess this is what this KIP suggests, right? If yes, then how does this
 ensure that same message will not be written twice when two producers are
 producing to same partition? Producer on receiving a nack will try again
 with next offset and will keep doing so till the message is accepted. Am I
 missing something?

 Also, you have mentioned on KIP, it imposes little to no runtime cost in
 memory or time, I think that is not true for time. With this approach
 producers' performance will reduce proportionally to number of producers
 writing to same partition. Please correct me if I am missing out something.


 On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:

 If we have 2 producers producing to a partition, they can be out of order,
 then how does one producer know what offset to expect as it does not
 interact with other producer?

 Can you give an example flow that explains how it works with single
 producer and with multiple producers?


 Thanks,

 Mayuresh

 On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira 
 fpjunque...@yahoo.com.invalid wrote:

  I like this feature, it reminds me of conditional updates in zookeeper.
  I'm not sure if it'd be best to have some mechanism for fencing rather
 than
  a conditional write like you're proposing. The reason I'm saying this is
  that the conditional write applies to requests individually, while it
  sounds like you want to make sure that there is a single client writing
 so
  over multiple requests.
 
  -Flavio
 
   On 17 Jul 2015, at 07:30, Ben Kirwin b...@kirw.in wrote:
  
   Hi there,
  
   I just added a KIP for a 'conditional publish' operation: a simple
   CAS-like mechanism for the Kafka producer. The wiki page is here:
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
  
   And there's some previous discussion on the ticket and the users list:
  
   https://issues.apache.org/jira/browse/KAFKA-2260