[jira] [Commented] (KAFKA-3335) Kafka Connect hangs in shutdown hook

2016-06-02 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313267#comment-15313267
 ] 

Ben Kirwin commented on KAFKA-3335:
---

Excellent -- thanks for following up on this!

> Kafka Connect hangs in shutdown hook
> 
>
> Key: KAFKA-3335
> URL: https://issues.apache.org/jira/browse/KAFKA-3335
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Ben Kirwin
> Fix For: 0.10.0.0
>
>
> The `Connect` class can run into issues during start, such as:
> {noformat}
> Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: 
> Could not look up partition metadata for offset backing store topic in 
> allotted period. This could indicate a connectivity issue, unavailable topic 
> partitions, or if this is your first use of the topic it may have taken too 
> long to create.
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130)
> at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85)
> at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108)
> at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)
> {noformat}
> This exception halts the startup process. It also triggers the shutdown 
> hook... which blocks waiting for the service to start up before calling stop. 
> This causes the process to hang forever.
> There's a few things that could be done here, but it would be nice to bound 
> the amount of time the process spends trying to exit gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2016-06-02 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313258#comment-15313258
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Hi! I haven't had time to push on this at all in the last few months, but I'm 
still interested as well.

My understanding was that the core team was very focussed on streams / the 
other features in the 0.10 release, but would be interested in looking at 
coordination proposals after that. (This / idempotent producer / etc.) Does 
anyone think working this up into a github PR seems useful, or is there another 
next best step? The existing KIP is already fairly well fleshed-out, I think: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3335) Kafka Connect hangs in shutdown hook

2016-03-04 Thread Ben Kirwin (JIRA)
Ben Kirwin created KAFKA-3335:
-

 Summary: Kafka Connect hangs in shutdown hook
 Key: KAFKA-3335
 URL: https://issues.apache.org/jira/browse/KAFKA-3335
 Project: Kafka
  Issue Type: Bug
Reporter: Ben Kirwin


The `Connect` class can run into issues during start, such as:

{noformat}
Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: 
Could not look up partition metadata for offset backing store topic in allotted 
period. This could indicate a connectivity issue, unavailable topic partitions, 
or if this is your first use of the topic it may have taken too long to create.
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)
{noformat}

This exception halts the startup process. It also triggers the shutdown hook... 
which blocks waiting for the service to start up before calling stop. This 
causes the process to hang forever.

There's a few things that could be done here, but it would be nice to bound the 
amount of time the process spends trying to exit gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-18 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632537#comment-14632537
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Ah, clever! Thanks for sharing this -- I've linked to it on the discussion 
thread.

{quote}
When changing the number of sub-partitions, the broker doesn't have to 
recompute sub-partition high water marks. It can initialize all array elements 
with the partition's high water mark.
{quote}

It seems we could also do this initialization every time the log is opened -- 
and avoid any persistent storage for the 'sub-partition' offsets at all. This 
would remove another major drawback of the per-key approach.

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-17 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14630870#comment-14630870
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Opened a KIP for this here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

[~becket_qin]: Thanks! To #1 and #3 -- this feature is aimed at the special 
(but fairly common) case where we only expect one producer for a single 
partition at a given time. More complicated situations definitely require more 
elaborate coordination -- and while it's possible that you might implement more 
elaborate coordination mechanisms on top of this, I've left that out of scope 
for now. To #2 -- there are definitely cases where key-based CAS feels handier, 
but the converse is also true, and tracking offsets for each key requires some 
auxiliary data with some commensurate overhead. I discussed this a little bit 
in the KIP -- though the point that 'partial failure' becomes more likely is 
not something I'd considered, and also a very good point.


 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-17 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632120#comment-14632120
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Interesting -- the idea is that we track the max offset per *hash* of the key, 
instead of the key itself? I guess that if you use an array of length 1, this 
reduces to the current proposal. :) It would be interesting to calculate how 
frequently different keys would conflict, given a good hash function.

It seems like, for this to work, you'd need to add an additional method to the 
API to get the current offset for the hash of a given key?

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-17 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14632113#comment-14632113
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Ah, sorry! Let me try again.

Suppose you try and send a batch of messages to a partition, but get some 
network error -- it's possible that they were published successfully, or that 
they were lost before they made it to the broker. With the CAS, the simple 
thing to do is to resend the same batch with the same expected offsets. If the 
messages were published correctly last time, you'll get a check mismatch error; 
and if they weren't, they'll be appended correctly.

If the series of messages that the producer wants to send is fixed, the same 
mechanism would work even through producer restarts. If the set of messages 
isn't fixed -- the producer might have a completely different set of messages 
to send after restarting -- than what it means to be exactly-once becomes a lot 
more domain-dependent; you might want to write exactly one group of messages 
for each input message, or rpc request, or five-minute interval -- but that 
requires coordination between a bunch of different moving parts, and I don't 
think there's one coordination mechanism that handles all cases. (This 
'expected offset' thing is enough for some, but certainly not all of them...)

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd 

[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-14 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627460#comment-14627460
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Hi [~ewencp] -- thanks for the interest! I'd be glad to work this up into a 
KIP, but it looks like I don't have the permissions to create a wiki page... 
could you set that up?

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-14 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627484#comment-14627484
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Will do; thanks!

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-13 Thread Ben Kirwin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Kirwin updated KAFKA-2260:
--
Status: Patch Available  (was: Open)

Worked up a draft of this over the weekend, implementing the 
cas-on-partition-offset feature outlined in the original post. This is enough 
to support many cases of 'idempotent producer', along with a bunch of other fun 
stuff.

I'm attaching the diff here -- if folks are interested in moving this forward, 
I'll post it to reviewboard as well?

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor

 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce

2015-07-13 Thread Ben Kirwin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Kirwin updated KAFKA-2260:
--
Attachment: expected-offsets.patch

 Allow specifying expected offset on produce
 ---

 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor
 Attachments: expected-offsets.patch


 I'd like to propose a change that adds a simple CAS-like mechanism to the 
 Kafka producer. This update has a small footprint, but enables a bunch of 
 interesting uses in stream processing or as a commit log for process state.
 h4. Proposed Change
 In short:
 - Allow the user to attach a specific offset to each message produced.
 - The server assigns offsets to messages in the usual way. However, if the 
 expected offset doesn't match the actual offset, the server should fail the 
 produce request instead of completing the write.
 This is a form of optimistic concurrency control, like the ubiquitous 
 check-and-set -- but instead of checking the current value of some state, it 
 checks the current offset of the log.
 h4. Motivation
 Much like check-and-set, this feature is only useful when there's very low 
 contention. Happily, when Kafka is used as a commit log or as a 
 stream-processing transport, it's common to have just one producer (or a 
 small number) for a given partition -- and in many of these cases, predicting 
 offsets turns out to be quite useful.
 - We get the same benefits as the 'idempotent producer' proposal: a producer 
 can retry a write indefinitely and be sure that at most one of those attempts 
 will succeed; and if two producers accidentally write to the end of the 
 partition at once, we can be certain that at least one of them will fail.
 - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
 messages consecutively to a partition, even if the list is much larger than 
 the buffer size or the producer has to be restarted.
 - If a process is using Kafka as a commit log -- reading from a partition to 
 bootstrap, then writing any updates to that same partition -- it can be sure 
 that it's seen all of the messages in that partition at the moment it does 
 its first (successful) write.
 There's a bunch of other similar use-cases here, but they all have roughly 
 the same flavour.
 h4. Implementation
 The major advantage of this proposal over other suggested transaction / 
 idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
 currently-unused field, adds no new APIs, and requires very little new code 
 or additional work from the server.
 - Produced messages already carry an offset field, which is currently ignored 
 by the server. This field could be used for the 'expected offset', with a 
 sigil value for the current behaviour. (-1 is a natural choice, since it's 
 already used to mean 'next available offset'.)
 - We'd need a new error and error code for a 'CAS failure'.
 - The server assigns offsets to produced messages in 
 {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
 changed, this method would assign offsets in the same way -- but if they 
 don't match the offset in the message, we'd return an error instead of 
 completing the write.
 - To avoid breaking existing clients, this behaviour would need to live 
 behind some config flag. (Possibly global, but probably more useful 
 per-topic?)
 I understand all this is unsolicited and possibly strange: happy to answer 
 questions, and if this seems interesting, I'd be glad to flesh this out into 
 a full KIP or patch. (And apologies if this is the wrong venue for this sort 
 of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2260) Allow specifying expected offset on produce

2015-06-08 Thread Ben Kirwin (JIRA)
Ben Kirwin created KAFKA-2260:
-

 Summary: Allow specifying expected offset on produce
 Key: KAFKA-2260
 URL: https://issues.apache.org/jira/browse/KAFKA-2260
 Project: Kafka
  Issue Type: Improvement
Reporter: Ben Kirwin
Priority: Minor


I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka 
producer. This update has a small footprint, but enables a bunch of interesting 
uses in stream processing or as a commit log for process state.

h4. Proposed Change

In short:

- Allow the user to attach a specific offset to each message produced.

- The server assigns offsets to messages in the usual way. However, if the 
expected offset doesn't match the actual offset, the server should fail the 
produce request instead of completing the write.

This is a form of optimistic concurrency control, like the ubiquitous 
check-and-set -- but instead of checking the current value of some state, it 
checks the current offset of the log.

h4. Motivation

Much like check-and-set, this feature is only useful when there's very low 
contention. Happily, when Kafka is used as a commit log or as a 
stream-processing transport, it's common to have just one producer (or a small 
number) for a given partition -- and in many of these cases, predicting offsets 
turns out to be quite useful.

- We get the same benefits as the 'idempotent producer' proposal: a producer 
can retry a write indefinitely and be sure that at most one of those attempts 
will succeed; and if two producers accidentally write to the end of the 
partition at once, we can be certain that at least one of them will fail.

- It's possible to 'bulk load' Kafka this way -- you can write a list of n 
messages consecutively to a partition, even if the list is much larger than the 
buffer size or the producer has to be restarted.

- If a process is using Kafka as a commit log -- reading from a partition to 
bootstrap, then writing any updates to that same partition -- it can be sure 
that it's seen all of the messages in that partition at the moment it does its 
first (successful) write.

There's a bunch of other similar use-cases here, but they all have roughly the 
same flavour.

h4. Implementation

The major advantage of this proposal over other suggested transaction / 
idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
currently-unused field, adds no new APIs, and requires very little new code or 
additional work from the server.

- Produced messages already carry an offset field, which is currently ignored 
by the server. This field could be used for the 'expected offset', with a sigil 
value for the current behaviour. (-1 is a natural choice, since it's already 
used to mean 'next available offset'.)

- We'd need a new error and error code for a 'CAS failure'.

- The server assigns offsets to produced messages in 
{{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, 
this method would assign offsets in the same way -- but if they don't match the 
offset in the message, we'd return an error instead of completing the write.

- To avoid breaking existing clients, this behaviour would need to live behind 
some config flag. (Possibly global, but probably more useful per-topic?)

I understand all this is unsolicited and possibly strange: happy to answer 
questions, and if this seems interesting, I'd be glad to flesh this out into a 
full KIP or patch. (And apologies if this is the wrong venue for this sort of 
thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)