[jira] [Resolved] (KAFKA-5084) Allow Kafka connect source tasks to commit offsets without messages being sent

2017-04-18 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini resolved KAFKA-5084.

Resolution: Duplicate

> Allow Kafka connect source tasks to commit offsets without messages being sent
> --
>
> Key: KAFKA-5084
> URL: https://issues.apache.org/jira/browse/KAFKA-5084
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Chris Riccomini
>
> We are currently running [Debezium|http://debezium.io/] connectors in Kafka 
> connect. These connectors consume from MySQL's binlog, and produce into Kafka.
> One of the things we've observed is that some of our Debezium connectors are 
> not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 
> seconds). Some of our connectors seem to be committing only sporadically. For 
> low-volume connectors, the commits seem to happen once every hour or two, and 
> sometimes even longer.
> It sounds like the issue is that Kafka connect will only commit source task 
> offsets when the source task produces new source records. This is because 
> Kafka connect gets the offset to commit from an incoming source record. The 
> problem with this approach is that there are (in my opinion) valid reasons to 
> want to commit consumed offsets WITHOUT sending any new messages. Taking 
> Debezium as an example, there are cases where Debezium consumes messages, but 
> filters out messages based on a regex, or filter rule (e.g. table black 
> lists). In such a case, Debezium is consuming messages from MySQL's binlog, 
> and dropping them before they get to the Kafka connect framework. As such, 
> Kafka connect never sees these messages, and doesn't commit any progress. 
> This results in several problems:
> # In the event of a failure, the connector could fall WAY back, since the 
> last committed offset might be from hours ago, even thought it *has* 
> processed all recent messages--it just hasn't sent anything to Kafka.
> # For connectors like Debezium that consume from a source that has a 
> *limited* window to fetch messages (MySQL's binlog has time/size based 
> retention), you can actually fall off the edge of the binlog because the last 
> commit can actually happen farther back than the binlog goes, even though 
> Debezium has fetched every single message in the binlog--it just hasn't 
> produced anything due to filtering.
> Again, I don't see this as a Debezium-specific issue. I could imagine a 
> similar scenario with an [SST-based Cassandra 
> source|https://github.com/datamountaineer/stream-reactor/issues/162].
> It would be nice if Kafka connect allowed us a way to commit offsets for 
> source tasks even when messages haven't been sent recently. This would allow 
> source tasks to log their progress even if they're opting not to send 
> messages to Kafka due to filtering or for some other reason.
> (See https://issues.jboss.org/browse/DBZ-220 for more context.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5084) Allow Kafka connect source tasks to commit offsets without messages being sent

2017-04-18 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973509#comment-15973509
 ] 

Chris Riccomini commented on KAFKA-5084:


[~rhauch], hah, this does indeed look like a dupe of KAFKA-3821. I'll close 
this then.

> Allow Kafka connect source tasks to commit offsets without messages being sent
> --
>
> Key: KAFKA-5084
> URL: https://issues.apache.org/jira/browse/KAFKA-5084
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Chris Riccomini
>
> We are currently running [Debezium|http://debezium.io/] connectors in Kafka 
> connect. These connectors consume from MySQL's binlog, and produce into Kafka.
> One of the things we've observed is that some of our Debezium connectors are 
> not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 
> seconds). Some of our connectors seem to be committing only sporadically. For 
> low-volume connectors, the commits seem to happen once every hour or two, and 
> sometimes even longer.
> It sounds like the issue is that Kafka connect will only commit source task 
> offsets when the source task produces new source records. This is because 
> Kafka connect gets the offset to commit from an incoming source record. The 
> problem with this approach is that there are (in my opinion) valid reasons to 
> want to commit consumed offsets WITHOUT sending any new messages. Taking 
> Debezium as an example, there are cases where Debezium consumes messages, but 
> filters out messages based on a regex, or filter rule (e.g. table black 
> lists). In such a case, Debezium is consuming messages from MySQL's binlog, 
> and dropping them before they get to the Kafka connect framework. As such, 
> Kafka connect never sees these messages, and doesn't commit any progress. 
> This results in several problems:
> # In the event of a failure, the connector could fall WAY back, since the 
> last committed offset might be from hours ago, even thought it *has* 
> processed all recent messages--it just hasn't sent anything to Kafka.
> # For connectors like Debezium that consume from a source that has a 
> *limited* window to fetch messages (MySQL's binlog has time/size based 
> retention), you can actually fall off the edge of the binlog because the last 
> commit can actually happen farther back than the binlog goes, even though 
> Debezium has fetched every single message in the binlog--it just hasn't 
> produced anything due to filtering.
> Again, I don't see this as a Debezium-specific issue. I could imagine a 
> similar scenario with an [SST-based Cassandra 
> source|https://github.com/datamountaineer/stream-reactor/issues/162].
> It would be nice if Kafka connect allowed us a way to commit offsets for 
> source tasks even when messages haven't been sent recently. This would allow 
> source tasks to log their progress even if they're opting not to send 
> messages to Kafka due to filtering or for some other reason.
> (See https://issues.jboss.org/browse/DBZ-220 for more context.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5084) Allow Kafka connect source tasks to commit offsets without messages being sent

2017-04-18 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-5084:
--

 Summary: Allow Kafka connect source tasks to commit offsets 
without messages being sent
 Key: KAFKA-5084
 URL: https://issues.apache.org/jira/browse/KAFKA-5084
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Chris Riccomini


We are currently running [Debezium|http://debezium.io/] connectors in Kafka 
connect. These connectors consume from MySQL's binlog, and produce into Kafka.

One of the things we've observed is that some of our Debezium connectors are 
not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 
seconds). Some of our connectors seem to be committing only sporadically. For 
low-volume connectors, the commits seem to happen once every hour or two, and 
sometimes even longer.

It sounds like the issue is that Kafka connect will only commit source task 
offsets when the source task produces new source records. This is because Kafka 
connect gets the offset to commit from an incoming source record. The problem 
with this approach is that there are (in my opinion) valid reasons to want to 
commit consumed offsets WITHOUT sending any new messages. Taking Debezium as an 
example, there are cases where Debezium consumes messages, but filters out 
messages based on a regex, or filter rule (e.g. table black lists). In such a 
case, Debezium is consuming messages from MySQL's binlog, and dropping them 
before they get to the Kafka connect framework. As such, Kafka connect never 
sees these messages, and doesn't commit any progress. This results in several 
problems:

# In the event of a failure, the connector could fall WAY back, since the last 
committed offset might be from hours ago, even thought it *has* processed all 
recent messages--it just hasn't sent anything to Kafka.
# For connectors like Debezium that consume from a source that has a *limited* 
window to fetch messages (MySQL's binlog has time/size based retention), you 
can actually fall off the edge of the binlog because the last commit can 
actually happen farther back than the binlog goes, even though Debezium has 
fetched every single message in the binlog--it just hasn't produced anything 
due to filtering.

Again, I don't see this as a Debezium-specific issue. I could imagine a similar 
scenario with an [SST-based Cassandra 
source|https://github.com/datamountaineer/stream-reactor/issues/162].

It would be nice if Kafka connect allowed us a way to commit offsets for source 
tasks even when messages haven't been sent recently. This would allow source 
tasks to log their progress even if they're opting not to send messages to 
Kafka due to filtering or for some other reason.

(See https://issues.jboss.org/browse/DBZ-220 for more context.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2376) Add Copycat metrics

2017-02-03 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851926#comment-15851926
 ] 

Chris Riccomini commented on KAFKA-2376:


One thing we'd like to see here is the ability to hook into the Kafka Connect 
metrics from our connectors. That way we can add connector-specific metrics, 
and have them flow through the same pipeline.

> Add Copycat metrics
> ---
>
> Key: KAFKA-2376
> URL: https://issues.apache.org/jira/browse/KAFKA-2376
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>  Labels: needs-kip
>
> Copycat needs good metrics for monitoring since that will be the primary 
> insight into the health of connectors as they copy data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2016-11-11 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657561#comment-15657561
 ] 

Chris Riccomini commented on KAFKA-4107:


Agreed. We've had this need as well, and have been resorting to renaming the 
connector every time. It creates a ton of garbage in the commit topic, though.



> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Ewen Cheslack-Postava
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4171) Kafka-connect prints outs keystone and truststore password in log2

2016-09-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15491164#comment-15491164
 ] 

Chris Riccomini commented on KAFKA-4171:


Wondering if KC is just putting configuration from consumer.* into the config 
without the consumer.*, but leaving the config with the consumer. prefix 
floating around still? Seems like the prefix should be stripped when forwarding 
config on to the underlying Kafka consumer.

> Kafka-connect prints outs keystone and truststore password in log2
> --
>
> Key: KAFKA-4171
> URL: https://issues.apache.org/jira/browse/KAFKA-4171
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Akshath Patkar
>Assignee: Ewen Cheslack-Postava
>
> Kafka-connect prints outs keystone and truststore password in log
> [2016-09-14 16:30:33,971] WARN The configuration 
> consumer.ssl.truststore.password = X was supplied but isn't a known 
> config. (org.apache.kafka.clients.consumer.ConsumerConfig:186)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs

2015-03-13 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-2020:
--

 Summary: I expect ReplicaNotAvailableException to have proper 
Javadocs
 Key: KAFKA-2020
 URL: https://issues.apache.org/jira/browse/KAFKA-2020
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Chris Riccomini
Assignee: Neha Narkhede


It looks like ReplicaNotAvailableException was copy and pasted from 
LeaderNotAvailable exception. The Javadocs were never changed. This means that 
users think that ReplicaNotAvailableException signifies leaders are not 
available. This is very different from, I can ignore this exception, which is 
what the Kafka protocol docs say to do with ReplicaNotAvailableException.

Related: what's the point of ReplicaNotAvailableException if it's supposed to 
be ignored?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-01-21 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14286326#comment-14286326
 ] 

Chris Riccomini commented on KAFKA-1886:


IMO, the SimpleConsumer should at least throw the proper exception.

 SimpleConsumer swallowing ClosedByInterruptException
 

 Key: KAFKA-1886
 URL: https://issues.apache.org/jira/browse/KAFKA-1886
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Reporter: Aditya A Auradkar
Assignee: Jun Rao

 This issue was originally reported by a Samza developer. I've included an 
 exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
 my dev setup.
 From: criccomi
 Hey all,
 Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
 interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
 Throwable in its sendRequest method [2]. I'm wondering: if 
 blockingChannel.send/receive throws a ClosedByInterruptException
 when the thread is interrupted, what happens? It looks like sendRequest will 
 catch the exception (which I
 think clears the thread's interrupted flag), and then retries the send. If 
 the send succeeds on the retry, I think that the ClosedByInterruptException 
 exception is effectively swallowed, and the BrokerProxy will continue
 fetching messages as though its thread was never interrupted.
 Am I misunderstanding how things work?
 Cheers,
 Chris
 [1] 
 https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
 [2] 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1863) Exception categories / hierarchy in clients

2015-01-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973
 ] 

Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:30 AM:
--

Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka 
producer to send messages. We batch inside Samza's KafkaSystemProducer, so 
calling KafkaSystemProducer.send() prepends your message to a buffer inside the 
KafkaSystemProducer (we don't send it to the underlying Kafka producer at this 
point). When the buffer reaches some max (say, 200), the send() call will 
trigger a sync KafkaSystemProducer.flush() call, which will call send() on the 
underlying producer. Since the send() call is blocking, any exception that 
occurs happens on the thread that's calling send(), so we just simply wrap the 
send() call in a try/catch. Flush is also called before Samza checkpoints its 
offsets, to make sure that any outstanding output is sent (so we don't lose 
data if there's a failure).

Samza's KafkaSystemProducer is currently setup to retry forever, no matter 
what, when sends to Kafka fail. It waits 10s, and retries. This behavior has 
come in handy in scenarios where we have (un)expected downtime or cluster 
maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza 
jobs to kill themselves. We just want them to chill out until the grid's back.

This strategy has some drawbacks, though. 

# The send is blocking. This hurts throughput. Sometimes, quite severely.
# Some exceptions are never going to get better. Retrying doesn't make sense in 
this case.

When we upgrade to the new Producer, we want to address these issues. For (1), 
the new producer everything is async, so that gets fixed automatically. For 
(2), we'd like to only retry with RetriableException, and fail for the rest.

The trick is, if the sends are no longer sync, we have to have a mechanism by 
which we can verify that all outstanding messages have been successfully sent 
before we return.

{code}
send()
send()
send()
flush() // when this returns, all messages must be in Kafka.. inflight must be 
0 and buffer must be 0
{code}

The naive approach would be to just hold on to a future for each send() call, 
and stuff it in a list. When flush is invoked, simply iterate over the list, 
and call Future.get(). The confusing thing with this approach is what to do if 
Future.get() tells us that there was an exception when the message send was 
attempted. It seems that re-sending the message at this point would result in 
out of order messages. Is that correct?

{code}
send() // success
send() // failed
send() // enqueued
flush()
{code}

In the above example, if we call Future.get() on the second send()'s future, 
and we see it's a failure, the third send() could have already happened. In 
this case, if we re-send the failed message, then we get the third message 
before the second, right?

Another oddity of this approach is that it seems we would have to double-buffer 
the messages in order to resend them, since RecordMetadata doesn't have the 
actual message that we sent. In order to re-send, we'd have to keep the message 
in a buffer in KafkaSystemProducer, along with its future, until the 
Future.get() returns successfully.

We then thought that we would set:

{noformat}
retries=20
{noformat}

At this point, we thought that the producer would re-send forever from the send 
thread, which is pretty much what we wanted in the case of a failure. But then 
we realized that if we had multiple in-flight requests, this could lead to out 
of order messages, so we forced:

{noformat}
max.in.flight.requests.per.connection=1
{noformat}

So at this point, we believe that any message that gets into the send thread 
will be guaranteed to be sent, and in the right order, always.

But, talking with [~guozhang], it sounds like there are some cases where 
ApiExceptions are thrown from the main thread, not the send thread. In this 
case, it seems that our {{retries}} setting has no effect, because the message 
hasn't even made it into the queue for the send thread yet. So, if an exception 
is thrown in the main thread, we seem to have to catch the RetriableException 
that's thrown, and re-send the message.

This is the current implementation that [~navina] is working on. Part of me is 
thinking, This can't be right. We must be misunderstanding something. So, I 
1) want to confirm that we're not misunderstanding anything, and 2) see if 
there is a better way to accomplish what we want:

# Async sends.
# A flush mechanism that blocks until all messages have been sent to Kafka.
# In-order retries when a RetriableFailure occurs.
# Forward a non-RetriableException when it occurs.

I'm pretty confident that my mental model is broken, so any help correcting it 
would be appreciated. Also, any tips on a better way to accomplish 

[jira] [Comment Edited] (KAFKA-1863) Exception categories / hierarchy in clients

2015-01-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973
 ] 

Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:38 AM:
--

Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka 
producer to send messages. We batch inside Samza's KafkaSystemProducer, so 
calling KafkaSystemProducer.send() prepends your message to a buffer inside the 
KafkaSystemProducer (we don't send it to the underlying Kafka producer at this 
point). When the buffer reaches some max (say, 200), the send() call will 
trigger a sync KafkaSystemProducer.flush() call, which will call send() on the 
underlying producer. Since the underlying kafka producer's send() call is 
blocking, any exception that occurs happens on the thread that's calling 
send(), so we just simply wrap the send() call in a try/catch. Flush is also 
called before Samza checkpoints its offsets, to make sure that any outstanding 
output is sent (so we don't lose data if there's a failure).

Samza's KafkaSystemProducer is currently setup to retry forever, no matter 
what, when sends to Kafka fail. It waits 10s, and retries. This behavior has 
come in handy in scenarios where we have (un)expected downtime or cluster 
maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza 
jobs to kill themselves. We just want them to chill out until the grid's back.

This strategy has some drawbacks, though. 

# The send is blocking. This hurts throughput. Sometimes, quite severely.
# Some exceptions are never going to get better. Retrying doesn't make sense in 
this case.

When we upgrade to the new Producer, we want to address these issues. For (1), 
the new producer everything is async, so that gets fixed automatically. For 
(2), we'd like to only retry with RetriableException, and fail for the rest.

The trick is, if the sends are no longer sync, we have to have a mechanism by 
which we can verify that all outstanding messages have been successfully sent 
before we return.

{code}
send()
send()
send()
flush() // when this returns, all messages must be in Kafka.. inflight must be 
0 and buffer must be 0
{code}

The naive approach would be to just hold on to a future for each send() call, 
and stuff it in a list. When flush is invoked, simply iterate over the list, 
and call Future.get(). The confusing thing with this approach is what to do if 
Future.get() tells us that there was an exception when the message send was 
attempted. It seems that re-sending the message at this point would result in 
out of order messages. Is that correct?

{code}
send() // success
send() // failed
send() // enqueued
flush()
{code}

In the above example, if we call Future.get() on the second send()'s future, 
and we see it's a failure, the third send() could have already happened. In 
this case, if we re-send the failed message, then we get the third message 
before the second, right?

Another oddity of this approach is that it seems we would have to double-buffer 
the messages in order to resend them, since RecordMetadata doesn't have the 
actual message that we sent. In order to re-send, we'd have to keep the message 
in a buffer in KafkaSystemProducer, along with its future, until the 
Future.get() returns successfully.

We then thought that we would set:

{noformat}
retries=20
{noformat}

At this point, we thought that the producer would re-send forever from the send 
thread, which is pretty much what we wanted in the case of a failure. But then 
we realized that if we had multiple in-flight requests, this could lead to out 
of order messages, so we forced:

{noformat}
max.in.flight.requests.per.connection=1
{noformat}

So at this point, we believe that any message that gets into the send thread 
will be guaranteed to be sent, and in the right order, always.

But, talking with [~guozhang], it sounds like there are some cases where 
ApiExceptions are thrown from the main thread, not the send thread. In this 
case, it seems that our {{retries}} setting has no effect, because the message 
hasn't even made it into the queue for the send thread yet. So, if an exception 
is thrown in the main thread, we seem to have to catch the RetriableException 
that's thrown, and re-send the message.

This is the current implementation that [~navina] is working on. Part of me is 
thinking, This can't be right. We must be misunderstanding something. So, I 
1) want to confirm that we're not misunderstanding anything, and 2) see if 
there is a better way to accomplish what we want:

# Async sends.
# A flush mechanism that blocks until all messages have been sent to Kafka.
# In-order retries when a RetriableFailure occurs.
# Forward a non-RetriableException when it occurs.

I'm pretty confident that my mental model is broken, so any help correcting it 
would be appreciated. Also, any tips on a 

[jira] [Comment Edited] (KAFKA-1863) Exception categories / hierarchy in clients

2015-01-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973
 ] 

Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:39 AM:
--

Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka 
producer to send messages. We batch inside Samza's KafkaSystemProducer, so 
calling KafkaSystemProducer.send() prepends your message to a buffer inside the 
KafkaSystemProducer (we don't send it to the underlying Kafka producer at this 
point). When the buffer reaches some max (say, 200), the send() call will 
trigger a sync KafkaSystemProducer.flush() call, which will call send() on the 
underlying producer. Since the underlying kafka producer's send() call is 
blocking, any exception that occurs happens on the thread that's calling 
send(), so we just simply wrap the send() call in a try/catch. Flush is also 
called before Samza checkpoints its offsets, to make sure that any outstanding 
output is sent (so we don't lose data if there's a failure).

Samza's KafkaSystemProducer is currently setup to retry forever, no matter 
what, when sends to Kafka fail. It waits 10s, and retries. This behavior has 
come in handy in scenarios where we have (un)expected downtime or cluster 
maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza 
jobs to kill themselves. We just want them to chill out until the grid's back.

This strategy has some drawbacks, though. 

# The KafkaSystemProducer.flush() call is blocking. This hurts throughput. 
Sometimes, quite severely.
# Some exceptions are never going to get better. Retrying doesn't make sense in 
this case.

When we upgrade to the new Producer, we want to address these issues. For (1), 
the new producer everything is async, so that gets fixed automatically. For 
(2), we'd like to only retry with RetriableException, and fail for the rest.

The trick is, if the sends are no longer sync, we have to have a mechanism by 
which we can verify that all outstanding messages have been successfully sent 
before we return.

{code}
send()
send()
send()
flush() // when this returns, all messages must be in Kafka.. inflight must be 
0 and buffer must be 0
{code}

The naive approach would be to just hold on to a future for each send() call, 
and stuff it in a list. When flush is invoked, simply iterate over the list, 
and call Future.get(). The confusing thing with this approach is what to do if 
Future.get() tells us that there was an exception when the message send was 
attempted. It seems that re-sending the message at this point would result in 
out of order messages. Is that correct?

{code}
send() // success
send() // failed
send() // enqueued
flush()
{code}

In the above example, if we call Future.get() on the second send()'s future, 
and we see it's a failure, the third send() could have already happened. In 
this case, if we re-send the failed message, then we get the third message 
before the second, right?

Another oddity of this approach is that it seems we would have to double-buffer 
the messages in order to resend them, since RecordMetadata doesn't have the 
actual message that we sent. In order to re-send, we'd have to keep the message 
in a buffer in KafkaSystemProducer, along with its future, until the 
Future.get() returns successfully.

We then thought that we would set:

{noformat}
retries=20
{noformat}

At this point, we thought that the producer would re-send forever from the send 
thread, which is pretty much what we wanted in the case of a failure. But then 
we realized that if we had multiple in-flight requests, this could lead to out 
of order messages, so we forced:

{noformat}
max.in.flight.requests.per.connection=1
{noformat}

So at this point, we believe that any message that gets into the send thread 
will be guaranteed to be sent, and in the right order, always.

But, talking with [~guozhang], it sounds like there are some cases where 
ApiExceptions are thrown from the main thread, not the send thread. In this 
case, it seems that our {{retries}} setting has no effect, because the message 
hasn't even made it into the queue for the send thread yet. So, if an exception 
is thrown in the main thread, we seem to have to catch the RetriableException 
that's thrown, and re-send the message.

This is the current implementation that [~navina] is working on. Part of me is 
thinking, This can't be right. We must be misunderstanding something. So, I 
1) want to confirm that we're not misunderstanding anything, and 2) see if 
there is a better way to accomplish what we want:

# Async sends.
# A flush mechanism that blocks until all messages have been sent to Kafka.
# In-order retries when a RetriableFailure occurs.
# Forward a non-RetriableException when it occurs.

I'm pretty confident that my mental model is broken, so any help correcting it 
would be 

[jira] [Commented] (KAFKA-1863) Exception categories / hierarchy in clients

2015-01-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973
 ] 

Chris Riccomini commented on KAFKA-1863:


Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka 
producer to send messages. We batch inside Samza's KafkaSystemProducer, so 
calling KafkaSystemProducer.send() prepends your message to a buffer inside the 
KafkaSystemProducer (we don't send it to the underlying Kafka producer at this 
point). When the buffer reaches some max (say, 200), the send() call will 
trigger a sync KafkaSystemProducer.flush() call, which will call send() on the 
underlying producer. Since the send() call is blocking, any exception that 
occurs happens on the thread that's calling send(), so we just simply wrap the 
send() call in a try/catch. Flush is also called before Samza checkpoints its 
offsets, to make sure that any outstanding output is sent (so we don't lose 
data if there's a failure).

Samza's KafkaSystemProducer is currently setup to retry forever, no matter 
what, when sends to Kafka fail. It waits 10s, and retries. This behavior has 
come in handy in scenarios where we have (un)expected downtime or cluster 
maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza 
jobs to kill themselves. We just want them to chill out until the grid's back.

This strategy has some drawbacks, though. 

# The send is blocking. This hurts throughput. Sometimes, quite severely.
# Some exceptions are never going to get better. Retrying doesn't make sense in 
this case.

When we upgrade to the new Producer, we want to address these issues. For (1), 
the new producer everything is async, so that gets fixed automatically. For 
(2), we'd like to only retry with RetriableException, and fail for the rest.

The trick is, if the sends are no longer sync, we have to have a mechanism by 
which we can verify that all outstanding messages have been successfully sent 
before we return.

{code}
send()
send()
send()
flush() // when this returns, all messages must be in Kafka.. inflight must be 
0 and buffer must be 0
{code}

The naive approach would be to just hold on to a future for each send() call, 
and stuff it in a list. When flush is invoked, simply iterate over the list, 
and call Future.get(). The confusing thing with this approach is what to do if 
Future.get() tells us that there was an exception when the message send was 
attempted. It seems that re-sending the message at this point would result in 
out of order messages. Is that correct?

{code}
send() // success
send() // failed
send() // enqueued
flush()
{code}

In the above example, if we call Future.get() on the second send()'s future, 
and we see it's a failure, the third send() could have already happened. In 
this case, if we re-send the failed message, then we get the third message 
before the second, right?

Another oddity of this approach is that it seems we would have to double-buffer 
the messages in order to resend them, since RecordMetadata doesn't have the 
actual message that we sent. In order to re-send, we'd have to keep the message 
in a buffer in KafkaSystemProducer, along with its future, until the 
Future.get() returns successfully.

We then thought that we would set:

{noformat}
retries=20
{noformat}

At this point, we thought that the producer would re-send forever from the send 
thread, which is pretty much what we wanted in the case of a failure. But then 
we realized that if we had multiple in-flight requests, this could lead to out 
of order messages, so we forced:

{noformat}
max.in.flight.requests.per.connection=1
{noformat}

So at this point, we believe that any message that gets into the send thread 
will be guaranteed to be sent, and in the right order, always.

But, talking with @guozhang, it sounds like there are some cases where 
ApiExceptions are thrown from the main thread, not the send thread. In this 
case, it seems that our {{retries}} setting has no effect, because the message 
hasn't even made it into the queue for the send thread yet. So, if an exception 
is thrown in the main thread, we seem to have to catch the RetriableException 
that's thrown, and re-send the message.

This is the current implementation that @navina is working on. Part of me is 
thinking, This can't be right. We must be misunderstanding something. So, I 
1) want to confirm that we're not misunderstanding anything, and 2) see if 
there is a better way to accomplish what we want:

# Async sends.
# A flush mechanism that blocks until all messages have been sent to Kafka.
# In-order retries when a RetriableFailure occurs.
# Forward a non-RetriableException when it occurs.

I'm pretty confident that my mental model is broken, so any help correcting it 
would be appreciated. Also, any tips on a better way to accomplish what we want 
would be appreciated.

 Exception 

[jira] [Commented] (KAFKA-1863) Exception categories / hierarchy in clients

2015-01-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14278107#comment-14278107
 ] 

Chris Riccomini commented on KAFKA-1863:


bq. It might be nice to add a flush() call that waits for all the currently 
buffered or in-flight requests to complete assuming this doesn't add 
inefficiency in the normal case.

+1 This would be very helpful.

 Exception categories / hierarchy in clients
 ---

 Key: KAFKA-1863
 URL: https://issues.apache.org/jira/browse/KAFKA-1863
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


 In the new clients package we introduces a new set of exceptions, but its 
 hierarchy is not very clear as of today:
 {code}
 RuntimeException - KafkaException - BufferExhastedException
- ConfigException
- 
 SerializationException
- 
 QuotaViolationException
- SchemaException
- ApiException
 ApiException - InvalidTopicException
  - OffsetMetadataTooLarge (probabaly need to be renamed)
  - RecordBatchTooLargeException
  - RecordTooLargeException
  - UnknownServerException
  - RetriableException
 RetriableException - CorruptRecordException
- InvalidMetadataException
- NotEnoughtReplicasAfterAppendException
- NotEnoughReplicasException
- OffsetOutOfRangeException
- TimeoutException
- UnknownTopicOrPartitionException
 {code}
 KafkaProducer.send() may throw KafkaExceptions that are not ApiExceptions; 
 other exceptions will be set in the returned future metadata.
 We need better to
 1. Re-examine the hierarchy. For example, for producers only exceptions that 
 are thrown directly from the caller thread before it is appended to the batch 
 buffer should be ApiExceptions; some exceptions could be renamed / merged.
 2. Clearly document the exception category / hierarchy as part of the release.
 [~criccomini] may have some more feedbacks for this issue from Samza's usage 
 experience. [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1863) Exception categories / hierarchy in clients

2015-01-14 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14278113#comment-14278113
 ] 

Chris Riccomini commented on KAFKA-1863:


[~jkreps], so it sounds like we can fail our containers on all exceptions that 
we see, and rely on retries=20 to handle all RetriableExceptions, 
correct?


 Exception categories / hierarchy in clients
 ---

 Key: KAFKA-1863
 URL: https://issues.apache.org/jira/browse/KAFKA-1863
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


 In the new clients package we introduces a new set of exceptions, but its 
 hierarchy is not very clear as of today:
 {code}
 RuntimeException - KafkaException - BufferExhastedException
- ConfigException
- 
 SerializationException
- 
 QuotaViolationException
- SchemaException
- ApiException
 ApiException - InvalidTopicException
  - OffsetMetadataTooLarge (probabaly need to be renamed)
  - RecordBatchTooLargeException
  - RecordTooLargeException
  - UnknownServerException
  - RetriableException
 RetriableException - CorruptRecordException
- InvalidMetadataException
- NotEnoughtReplicasAfterAppendException
- NotEnoughReplicasException
- OffsetOutOfRangeException
- TimeoutException
- UnknownTopicOrPartitionException
 {code}
 KafkaProducer.send() may throw KafkaExceptions that are not ApiExceptions; 
 other exceptions will be set in the returned future metadata.
 We need better to
 1. Re-examine the hierarchy. For example, for producers only exceptions that 
 are thrown directly from the caller thread before it is appended to the batch 
 buffer should be ApiExceptions; some exceptions could be renamed / merged.
 2. Clearly document the exception category / hierarchy as part of the release.
 [~criccomini] may have some more feedbacks for this issue from Samza's usage 
 experience. [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1755) Log cleaner thread should not exit on errors

2014-11-05 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198578#comment-14198578
 ] 

Chris Riccomini commented on KAFKA-1755:


It might also be desirable to allow the log compaction to continue on the topic 
in question, and simply keep all messages without keys without doing any 
compaction on them.

 Log cleaner thread should not exit on errors
 

 Key: KAFKA-1755
 URL: https://issues.apache.org/jira/browse/KAFKA-1755
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.8.3


 The log cleaner is a critical process when using compacted topics.
 However, if there is any error in any topic (notably if a key is missing) 
 then the cleaner exits and all other compacted topics will also be adversely 
 affected - i.e., compaction stops across the board.
 This can be improved by just aborting compaction for a topic on any error and 
 keep the thread from exiting.
 Another improvement would be to reject messages without keys that are sent to 
 compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1639) Support control messages in Kafka

2014-09-17 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14137751#comment-14137751
 ] 

Chris Riccomini commented on KAFKA-1639:


One point of discussion is whether the control message should be a full-blown 
message, or just a field in an existing message's payload. A full message seems 
like a more general solution to me.

 Support control messages in Kafka
 -

 Key: KAFKA-1639
 URL: https://issues.apache.org/jira/browse/KAFKA-1639
 Project: Kafka
  Issue Type: Improvement
Reporter: Chris Riccomini

 The current transactionality proposal 
 (https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka)
  and implementation use control messages to handle transactions in Kafka. 
 Kafka traditionally hasn't had control messages in its topics. 
 Transactionality (as it's implemented) introduces this pattern, but appears 
 to do so in a very specific fashion (control messages only for transactions).
 It seems to me that a good approach to control messages would be to 
 generalize the control message model in Kafka to support not just transaction 
 control messages, but arbitrary control messages. On the producer side, 
 arbitrary control messages should be allowed to be sent, and on the consumer 
 side, these control messages should be dropped by default.
 Just like transactionality, this would let frameworks (e.g. Samza) and other 
 app-specific implementations take advantage of in-topic control messages (as 
 opposed to out of band control messages) without any impact on existing 
 consumers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-560) Garbage Collect obsolete topics

2014-09-10 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14128771#comment-14128771
 ] 

Chris Riccomini commented on KAFKA-560:
---

bq. It would be good to have a tool that could delete any topic that had not 
been written to in a configurable period of time and had no active consumer 
groups. 

I would prefer not to depend on consumer groups. Samza, for example, doesn't 
have consumer groups, so doing things like looking at the lsat offset commit of 
a consumer group in ZK/OffsetManager will not help if the consumer is using 
Samza (or some other offset checkpoint mechanism). The better approach, to me, 
seems to be to just have brokers keep track of approximate last-reads for each 
topic/partition based on FetchRequests.

 Garbage Collect obsolete topics
 ---

 Key: KAFKA-560
 URL: https://issues.apache.org/jira/browse/KAFKA-560
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
  Labels: project

 Old junk topics tend to accumulate over time. Code may migrate to use new 
 topics leaving the old ones orphaned. Likewise there are some use cases for 
 temporary transient topics. It would be good to have a tool that could delete 
 any topic that had not been written to in a configurable period of time and 
 had no active consumer groups. Something like
./bin/delete-unused-topics.sh --last-write [date] --zookeeper [zk_connect]
 This requires API support to get the last update time. I think it may be 
 possible to do this through the OffsetRequest now?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1171) Gradle build for Kafka

2013-12-10 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13844763#comment-13844763
 ] 

Chris Riccomini commented on KAFKA-1171:


For cross-building Scala versions, have a look at:

https://issues.apache.org/jira/browse/SAMZA-34

[~szczepiq] provided the example zip file.

 Gradle build for Kafka
 --

 Key: KAFKA-1171
 URL: https://issues.apache.org/jira/browse/KAFKA-1171
 Project: Kafka
  Issue Type: Improvement
  Components: packaging
Affects Versions: 0.8.1, 0.9.0
Reporter: David Arthur
Assignee: David Arthur
 Attachments: 0001-Adding-basic-Gradle-build.patch


 We have previously discussed moving away from SBT to an 
 easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a 
 patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted 
 to check out Gradle as well.
 1. https://issues.apache.org/jira/browse/KAFKA-855



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)


[jira] [Commented] (KAFKA-1171) Gradle build for Kafka

2013-12-10 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13844780#comment-13844780
 ] 

Chris Riccomini commented on KAFKA-1171:


For the record, a good set of examples on how to build OSS multi-module Gradle 
projects is Netflix's GitHub account:

  https://github.com/Netflix

Their account contains a number of projects that use Gradle (Lipstick, 
astyanax, servo, etc). In addition, they have a stub repo for bootstrapping new 
projects with Gradle.

  https://github.com/Netflix/gradle-template

 Gradle build for Kafka
 --

 Key: KAFKA-1171
 URL: https://issues.apache.org/jira/browse/KAFKA-1171
 Project: Kafka
  Issue Type: Improvement
  Components: packaging
Affects Versions: 0.8.1, 0.9.0
Reporter: David Arthur
Assignee: David Arthur
 Attachments: 0001-Adding-basic-Gradle-build.patch


 We have previously discussed moving away from SBT to an 
 easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a 
 patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted 
 to check out Gradle as well.
 1. https://issues.apache.org/jira/browse/KAFKA-855



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)


[jira] [Commented] (KAFKA-1171) Gradle build for Kafka

2013-12-10 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13844808#comment-13844808
 ] 

Chris Riccomini commented on KAFKA-1171:


That sounds a lot like a version incompatibility between the version of Gradle 
you're using, and what the project was built against (it includes no gradlew 
script).

I was able to successfully compile it use Samza's gradlew script. Might setup a 
gradlew in the project, and give that a shot.

 Gradle build for Kafka
 --

 Key: KAFKA-1171
 URL: https://issues.apache.org/jira/browse/KAFKA-1171
 Project: Kafka
  Issue Type: Improvement
  Components: packaging
Affects Versions: 0.8.1, 0.9.0
Reporter: David Arthur
Assignee: David Arthur
 Attachments: 0001-Adding-basic-Gradle-build.patch


 We have previously discussed moving away from SBT to an 
 easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a 
 patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted 
 to check out Gradle as well.
 1. https://issues.apache.org/jira/browse/KAFKA-855



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)


[jira] [Commented] (KAFKA-1016) Broker should limit purgatory size

2013-08-30 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13755167#comment-13755167
 ] 

Chris Riccomini commented on KAFKA-1016:


Another interesting behavior seen when FetchRequest is made with Int.MAX.

You start seeing high request purgatory count, AND you start seeing very high 
CLOSED_WAIT count on sockets on the broker. The reason for this appears to be 
that a connection can die or time out, but the connection is never closed 
because it's just sitting in purgatory for Int.MAX milliseconds. If no new 
messages appear for the topic the consumer is reading from (due to an upstream 
producer being down, for instance), then the connection never gets closed.

 Broker should limit purgatory size
 --

 Key: KAFKA-1016
 URL: https://issues.apache.org/jira/browse/KAFKA-1016
 Project: Kafka
  Issue Type: Bug
  Components: purgatory
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Joel Koshy

 I recently ran into a case where a poorly configured Kafka consumer was able 
 to trigger out of memory exceptions in multiple Kafka brokers. The consumer 
 was configured to have a fetcher.max.wait of Int.MaxInt.
 For low volume topics, this configuration causes the consumer to block for 
 frequently, and for long periods of time. [~junrao] informs me that the fetch 
 request will time out after the socket timeout is reached. In our case, this 
 was set to 30s.
 With several thousand consumer threads, the fetch request purgatory got into 
 the 100,000-400,000 range, which we believe triggered the out of memory 
 exception. [~nehanarkhede] claims to have seem similar behavior in other high 
 volume clusters.
 It kind of seems like a bad thing that a poorly configured consumer can 
 trigger out of memory exceptions in the broker. I was thinking maybe it makes 
 sense to have the broker try and protect itself from this situation. Here are 
 some potential solutions:
 1. Have a broker-side max wait config for fetch requests.
 2. Threshold the purgatory size, and either drop the oldest connections in 
 purgatory, or reject the newest fetch requests when purgatory is full.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it

2013-08-26 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750328#comment-13750328
 ] 

Chris Riccomini commented on KAFKA-1012:


Regarding transactionality, as described in 
https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management, how do you 
guarantee that the offset messages in a single transaction will be sequential 
with no gaps? As far as I understand, your assumption is that if a transaction 
has 5 messages, they will be materialized into the topic/partition as M1, M2, 
M3, M4, M5, with no messages in between.

Is there a single-writer that's making this guarantee? I don't see how this is 
possible if you allow clients to send produce requests directly to the topic. 
Without a single writer, you could end up with M1, M2, .. other offset messages 
.., M3, M4, M5. In such a case, how do you determine if the transaction has 
failed? Is there some timeout (If I don't get all 5 messages within 60s, the 
transaction is rolled back)?

 Implement an Offset Manager and hook offset requests to it
 --

 Key: KAFKA-1012
 URL: https://issues.apache.org/jira/browse/KAFKA-1012
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Tejas Patil
Assignee: Tejas Patil
Priority: Minor
 Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch


 After KAFKA-657, we have a protocol for consumers to commit and fetch offsets 
 from brokers. Currently, consumers are not using this API and directly 
 talking with Zookeeper. 
 This Jira will involve following:
 1. Add a special topic in kafka for storing offsets
 2. Add an OffsetManager interface which would handle storing, accessing, 
 loading and maintaining consumer offsets
 3. Implement offset managers for both of these 2 choices : existing ZK based 
 storage or inbuilt storage for offsets.
 4. Leader brokers would now maintain an additional hash table of offsets for 
 the group-topic-partitions that they lead
 5. Consumers should now use the OffsetCommit and OffsetFetch API

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it

2013-08-26 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750345#comment-13750345
 ] 

Chris Riccomini commented on KAFKA-1012:


From wiki: It will likely have a tight size limit to avoid server impact. 
Does Kafka support per-topic max message size? Without this, is it possible to 
enforce size limits on the offset topic? If writes are happening through the 
offset commit API it should be do-able, but if writes are sent as regular 
produce request, the broker would have to special case the offset topic to 
enforce size limits.

 Implement an Offset Manager and hook offset requests to it
 --

 Key: KAFKA-1012
 URL: https://issues.apache.org/jira/browse/KAFKA-1012
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Tejas Patil
Assignee: Tejas Patil
Priority: Minor
 Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch


 After KAFKA-657, we have a protocol for consumers to commit and fetch offsets 
 from brokers. Currently, consumers are not using this API and directly 
 talking with Zookeeper. 
 This Jira will involve following:
 1. Add a special topic in kafka for storing offsets
 2. Add an OffsetManager interface which would handle storing, accessing, 
 loading and maintaining consumer offsets
 3. Implement offset managers for both of these 2 choices : existing ZK based 
 storage or inbuilt storage for offsets.
 4. Leader brokers would now maintain an additional hash table of offsets for 
 the group-topic-partitions that they lead
 5. Consumers should now use the OffsetCommit and OffsetFetch API

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it

2013-08-26 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750351#comment-13750351
 ] 

Chris Riccomini commented on KAFKA-1012:


Also, what's the guidance on partitioning the offset topic. It almost seems 
like the best solution is to have one partition per broker, so that each broker 
can load share (both writes/sec and memory requirement to keep the map in 
memory).

 Implement an Offset Manager and hook offset requests to it
 --

 Key: KAFKA-1012
 URL: https://issues.apache.org/jira/browse/KAFKA-1012
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Tejas Patil
Assignee: Tejas Patil
Priority: Minor
 Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch


 After KAFKA-657, we have a protocol for consumers to commit and fetch offsets 
 from brokers. Currently, consumers are not using this API and directly 
 talking with Zookeeper. 
 This Jira will involve following:
 1. Add a special topic in kafka for storing offsets
 2. Add an OffsetManager interface which would handle storing, accessing, 
 loading and maintaining consumer offsets
 3. Implement offset managers for both of these 2 choices : existing ZK based 
 storage or inbuilt storage for offsets.
 4. Leader brokers would now maintain an additional hash table of offsets for 
 the group-topic-partitions that they lead
 5. Consumers should now use the OffsetCommit and OffsetFetch API

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-1016) Broker should limit purgatory size

2013-08-19 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-1016:
--

 Summary: Broker should limit purgatory size
 Key: KAFKA-1016
 URL: https://issues.apache.org/jira/browse/KAFKA-1016
 Project: Kafka
  Issue Type: Bug
  Components: purgatory
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Joel Koshy


I recently ran into a case where a poorly configured Kafka consumer was able to 
trigger out of memory exceptions in multiple Kafka brokers. The consumer was 
configured to have a fetcher.max.wait of Int.MaxInt.

For low volume topics, this configuration causes the consumer to block for 
frequently, and for long periods of time. [~junrao] informs me that the fetch 
request will time out after the socket timeout is reached. In our case, this 
was set to 30s.

With several thousand consumer threads, the fetch request purgatory got into 
the 100,000-400,000 range, which we believe triggered the out of memory 
exception. [~nehanarkhede] claims to have seem similar behavior in other high 
volume clusters.

It kind of seems like a bad thing that a poorly configured consumer can trigger 
out of memory exceptions in the broker. I was thinking maybe it makes sense to 
have the broker try and protect itself from this situation. Here are some 
potential solutions:

1. Have a broker-side max wait config for fetch requests.
2. Threshold the purgatory size, and either drop the oldest connections in 
purgatory, or reject the newest fetch requests when purgatory is full.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-982) Logo for Kafka

2013-07-23 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13716567#comment-13716567
 ] 

Chris Riccomini commented on KAFKA-982:
---

+1 301

 Logo for Kafka
 --

 Key: KAFKA-982
 URL: https://issues.apache.org/jira/browse/KAFKA-982
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
 Attachments: 289.jpeg, 294.jpeg, 296.png, 298.jpeg, 301.png


 We should have a logo for kafka.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-925) Add optional partition key override in producer

2013-07-17 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13711676#comment-13711676
 ] 

Chris Riccomini commented on KAFKA-925:
---

Hey Jay,

Seems pretty reasonable to me. Is the reason for the type change in the 
Partitioner so that you can handle either keys of type K (key) or keys of any 
type (part key) using the same partitioner?

Cheers,
Chris

 Add optional partition key override in producer
 ---

 Key: KAFKA-925
 URL: https://issues.apache.org/jira/browse/KAFKA-925
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-925-v1.patch, KAFKA-925-v2.patch


 We have a key that is used for partitioning in the producer and stored with 
 the message. Actually these uses, though often the same, could be different. 
 The two meanings are effectively:
 1. Assignment to a partition
 2. Deduplication within a partition
 In cases where we want to allow the client to take advantage of both of these 
 and they aren't the same it would be nice to allow them to be specified 
 separately.
 To implement this I added an optional partition key to KeyedMessage. When 
 specified this key is used for partitioning rather than the message key. This 
 key is of type Any and the parametric typing is removed from the partitioner 
 to allow it to work with either key.
 An alternative would be to allow the partition id to specified in the 
 KeyedMessage. This would be slightly more convenient in the case where there 
 is no partition key but instead you know a priori the partition number--this 
 case must be handled by giving the partition id as the partition key and 
 using an identity partitioner which is slightly more roundabout. However this 
 is inconsistent with the normal partitioning which requires a key in the case 
 where the partition is determined by a key--in that case you would be 
 manually calling your partitioner in user code. It seems best to me to either 
 use a key or always a partition and since we currently take a key I stuck 
 with that.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-725) Broker Exception: Attempt to read with a maximum offset less than start offset

2013-01-22 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-725:
-

 Summary: Broker Exception: Attempt to read with a maximum offset 
less than start offset
 Key: KAFKA-725
 URL: https://issues.apache.org/jira/browse/KAFKA-725
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Jay Kreps


I have a simple consumer that's reading from a single topic/partition pair. 
Running it seems to trigger these messages on the broker periodically:

2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] []  
[KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
(7951715) less than the start offset (7951732).
at kafka.log.LogSegment.read(LogSegment.scala:105)
at kafka.log.Log.read(Log.scala:390)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.Map$Map1.map(Map.scala:93)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
at 
kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
at 
kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at 
kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
at java.lang.Thread.run(Thread.java:619)

When I shut the consumer down, I don't see the exceptions anymore.

This is the code that my consumer is running:
  while(true) {
// we believe the consumer to be connected, so try and use it for a 
fetch request
val request = new FetchRequestBuilder()
  .addFetch(topic, partition, nextOffset, fetchSize)
  .maxWait(Int.MaxValue)
  // TODO for super high-throughput, might be worth waiting for 
more bytes
  .minBytes(1)
  .build

debug(Fetching messages for stream %s and offset %s. format 
(streamPartition, nextOffset))
val messages = connectedConsumer.fetch(request)
debug(Fetch complete for stream %s and offset %s. Got messages: 
%s format (streamPartition, nextOffset, messages))
if (messages.hasError) {
  warn(Got error code from broker for %s: %s. Shutting down 
consumer to trigger a reconnect. format (streamPartition, 
messages.errorCode(topic, partition)))
  ErrorMapping.maybeThrowException(messages.errorCode(topic, 
partition))
}
messages.messageSet(topic, partition).foreach(msg = {
  watchers.foreach(_.onMessagesReady(msg.offset.toString, 
msg.message.payload))
  nextOffset = msg.nextOffset
})
  }

Any idea what might be causing this error?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-715) NumberFormatException in PartitionStateInfo

2013-01-21 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-715:
-

 Summary: NumberFormatException in PartitionStateInfo
 Key: KAFKA-715
 URL: https://issues.apache.org/jira/browse/KAFKA-715
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Neha Narkhede


Hey Guys,

During a broker restart, I got this exception:

2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:host.name=eat1-qa466.corp.linkedin.com
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.version=1.6.0_21
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.vendor=Sun Microsystems Inc.
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.home=/export/apps/jdk/JDK-1_6_0_21/jre
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.class.path=/export/apps/jdk/JDK-1_6_0_21/lib/tools.jar:lib/activation-1.0.2.jar:lib/ant-1.6.5.jar:lib/aopalliance-1.0.jar:lib/cfg-2.8.0.jar:lib/cfg-api-6.6.6.jar:lib/cfg-impl-6.6.6.jar:lib/com.linkedin.customlibrary.j2ee-1.0.jar:lib/com.linkedin.customlibrary.mx4j-3.0.2.jar:lib/com.linkedin.customlibrary.xmsg-0.6.jar:lib/commons-beanutils-1.7.0.jar:lib/commons-cli-1.0.jar:lib/commons-lang-2.4.jar:lib/commons-logging-1.1.jar:lib/configuration-api-1.4.8.jar:lib/configuration-repository-impl-1.4.8.jar:lib/container-management-impl-1.1.15.jar:lib/container-server-1.1.15.jar:lib/emweb-impl-1.1.15.jar:lib/jaxen-1.1.1.jar:lib/jdom-1.0.jar:lib/jetty-6.1.26.jar:lib/jetty-management-6.1.26.jar:lib/jetty-naming-6.1.26.jar:lib/jetty-plus-6.1.26.jar:lib/jetty-util5-6.1.26.jar:lib/jetty-util-6.1.26.jar:lib/jmx-impl-1.4.8.jar:lib/json-simple-1.1.jar:lib/jsp-2.1-6.1.1.jar:lib/jsp-api-2.1-6.1.1.jar:lib/lispring-lispring-core-1.4.8.jar:lib/lispring-lispring-servlet-1.4.8.jar:lib/log4j-1.2.15.jar:lib/mail-1.3.0.jar:lib/mx4j-tools-3.0.2.jar:lib/servlet-api-2.5.jar:lib/spring-aop-3.0.3.jar:lib/spring-asm-3.0.3.jar:lib/spring-aspects-3.0.3.jar:lib/spring-beans-3.0.3.jar:lib/spring-context-3.0.3.jar:lib/spring-context-support-3.0.3.jar:lib/spring-core-3.0.3.jar:lib/spring-expression-3.0.3.jar:lib/spring-jdbc-3.0.3.jar:lib/spring-jms-3.0.3.jar:lib/spring-orm-3.0.3.jar:lib/spring-transaction-3.0.3.jar:lib/spring-web-3.0.3.jar:lib/spring-web-servlet-3.0.3.jar:lib/util-core-4.0.40.jar:lib/util-i18n-4.0.40.jar:lib/util-jmx-4.0.22.jar:lib/util-log-4.0.40.jar:lib/util-servlet-4.0.40.jar:lib/util-xmsg-4.0.40.jar:lib/xml-apis-1.3.04.jar
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.library.path=/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64/server:/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64:/export/apps/jdk/JDK-1_6_0_21/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.io.tmpdir=/export/content/glu/apps/kafka/i001/tmp
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:java.compiler=NA
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:os.name=Linux
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:os.arch=amd64
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:os.version=2.6.32-220.13.1.el6.x86_64
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:user.name=app
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:user.home=/home/app
2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
environment:user.dir=/export/content/glu/apps/kafka/i001
2013/01/21 19:21:10.919 INFO [ZooKeeper] [main] [kafka] []  Initiating client 
connection, 
connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1bfdbab5
2013/01/21 19:21:10.932 INFO [ClientCnxn] [main-SendThread()] [kafka] []  
Opening socket connection to server 
eat1-app313.corp.linkedin.com/172.20.72.73:12913
2013/01/21 19:21:10.933 INFO [ClientCnxn] 
[main-SendThread(eat1-app313.corp.linkedin.com:12913)] [kafka] []  Socket 
connection established to eat1-app313.corp.linkedin.com/172.20.72.73:12913, 
initiating session
2013/01/21 19:21:10.963 INFO [ClientCnxn] 
[main-SendThread(eat1-app313.corp.linkedin.com:12913)] [kafka] []  Session 
establishment complete on server 
eat1-app313.corp.linkedin.com/172.20.72.73:12913, sessionid = 
0x53afd073784059c, negotiated timeout = 6000
2013/01/21 19:21:10.964 INFO 

[jira] [Created] (KAFKA-709) Default queue.enqueue.timeout.ms to -1

2013-01-16 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-709:
-

 Summary: Default queue.enqueue.timeout.ms to -1
 Key: KAFKA-709
 URL: https://issues.apache.org/jira/browse/KAFKA-709
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Jun Rao


Hey Guys,

It seems that, by default, producers in 0.8 are async, and have a default 
queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster 
than they're producing them will likely end up eventually hitting this 
exception:

Exception in thread Thread-3 kafka.common.QueueFullException: Event queue is 
full of unsent messages, could not send event: 
KeyedMessage(PageViewEventByGroupJson,Missing Page 
Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125])
at 
kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111)
at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at kafka.producer.Producer.asyncSend(Producer.scala:89)
at kafka.producer.Producer.send(Producer.scala:77)

As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can 
result in losing messages, and nasty exceptions in the logs. I think the better 
default is setting queue.enqueue.timeout.ms to -1, which will just block until 
the queue frees up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira