[jira] [Resolved] (KAFKA-5084) Allow Kafka connect source tasks to commit offsets without messages being sent
[ https://issues.apache.org/jira/browse/KAFKA-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini resolved KAFKA-5084. Resolution: Duplicate > Allow Kafka connect source tasks to commit offsets without messages being sent > -- > > Key: KAFKA-5084 > URL: https://issues.apache.org/jira/browse/KAFKA-5084 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Chris Riccomini > > We are currently running [Debezium|http://debezium.io/] connectors in Kafka > connect. These connectors consume from MySQL's binlog, and produce into Kafka. > One of the things we've observed is that some of our Debezium connectors are > not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 > seconds). Some of our connectors seem to be committing only sporadically. For > low-volume connectors, the commits seem to happen once every hour or two, and > sometimes even longer. > It sounds like the issue is that Kafka connect will only commit source task > offsets when the source task produces new source records. This is because > Kafka connect gets the offset to commit from an incoming source record. The > problem with this approach is that there are (in my opinion) valid reasons to > want to commit consumed offsets WITHOUT sending any new messages. Taking > Debezium as an example, there are cases where Debezium consumes messages, but > filters out messages based on a regex, or filter rule (e.g. table black > lists). In such a case, Debezium is consuming messages from MySQL's binlog, > and dropping them before they get to the Kafka connect framework. As such, > Kafka connect never sees these messages, and doesn't commit any progress. > This results in several problems: > # In the event of a failure, the connector could fall WAY back, since the > last committed offset might be from hours ago, even thought it *has* > processed all recent messages--it just hasn't sent anything to Kafka. > # For connectors like Debezium that consume from a source that has a > *limited* window to fetch messages (MySQL's binlog has time/size based > retention), you can actually fall off the edge of the binlog because the last > commit can actually happen farther back than the binlog goes, even though > Debezium has fetched every single message in the binlog--it just hasn't > produced anything due to filtering. > Again, I don't see this as a Debezium-specific issue. I could imagine a > similar scenario with an [SST-based Cassandra > source|https://github.com/datamountaineer/stream-reactor/issues/162]. > It would be nice if Kafka connect allowed us a way to commit offsets for > source tasks even when messages haven't been sent recently. This would allow > source tasks to log their progress even if they're opting not to send > messages to Kafka due to filtering or for some other reason. > (See https://issues.jboss.org/browse/DBZ-220 for more context.) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5084) Allow Kafka connect source tasks to commit offsets without messages being sent
[ https://issues.apache.org/jira/browse/KAFKA-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973509#comment-15973509 ] Chris Riccomini commented on KAFKA-5084: [~rhauch], hah, this does indeed look like a dupe of KAFKA-3821. I'll close this then. > Allow Kafka connect source tasks to commit offsets without messages being sent > -- > > Key: KAFKA-5084 > URL: https://issues.apache.org/jira/browse/KAFKA-5084 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Chris Riccomini > > We are currently running [Debezium|http://debezium.io/] connectors in Kafka > connect. These connectors consume from MySQL's binlog, and produce into Kafka. > One of the things we've observed is that some of our Debezium connectors are > not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 > seconds). Some of our connectors seem to be committing only sporadically. For > low-volume connectors, the commits seem to happen once every hour or two, and > sometimes even longer. > It sounds like the issue is that Kafka connect will only commit source task > offsets when the source task produces new source records. This is because > Kafka connect gets the offset to commit from an incoming source record. The > problem with this approach is that there are (in my opinion) valid reasons to > want to commit consumed offsets WITHOUT sending any new messages. Taking > Debezium as an example, there are cases where Debezium consumes messages, but > filters out messages based on a regex, or filter rule (e.g. table black > lists). In such a case, Debezium is consuming messages from MySQL's binlog, > and dropping them before they get to the Kafka connect framework. As such, > Kafka connect never sees these messages, and doesn't commit any progress. > This results in several problems: > # In the event of a failure, the connector could fall WAY back, since the > last committed offset might be from hours ago, even thought it *has* > processed all recent messages--it just hasn't sent anything to Kafka. > # For connectors like Debezium that consume from a source that has a > *limited* window to fetch messages (MySQL's binlog has time/size based > retention), you can actually fall off the edge of the binlog because the last > commit can actually happen farther back than the binlog goes, even though > Debezium has fetched every single message in the binlog--it just hasn't > produced anything due to filtering. > Again, I don't see this as a Debezium-specific issue. I could imagine a > similar scenario with an [SST-based Cassandra > source|https://github.com/datamountaineer/stream-reactor/issues/162]. > It would be nice if Kafka connect allowed us a way to commit offsets for > source tasks even when messages haven't been sent recently. This would allow > source tasks to log their progress even if they're opting not to send > messages to Kafka due to filtering or for some other reason. > (See https://issues.jboss.org/browse/DBZ-220 for more context.) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5084) Allow Kafka connect source tasks to commit offsets without messages being sent
Chris Riccomini created KAFKA-5084: -- Summary: Allow Kafka connect source tasks to commit offsets without messages being sent Key: KAFKA-5084 URL: https://issues.apache.org/jira/browse/KAFKA-5084 Project: Kafka Issue Type: New Feature Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Chris Riccomini We are currently running [Debezium|http://debezium.io/] connectors in Kafka connect. These connectors consume from MySQL's binlog, and produce into Kafka. One of the things we've observed is that some of our Debezium connectors are not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 seconds). Some of our connectors seem to be committing only sporadically. For low-volume connectors, the commits seem to happen once every hour or two, and sometimes even longer. It sounds like the issue is that Kafka connect will only commit source task offsets when the source task produces new source records. This is because Kafka connect gets the offset to commit from an incoming source record. The problem with this approach is that there are (in my opinion) valid reasons to want to commit consumed offsets WITHOUT sending any new messages. Taking Debezium as an example, there are cases where Debezium consumes messages, but filters out messages based on a regex, or filter rule (e.g. table black lists). In such a case, Debezium is consuming messages from MySQL's binlog, and dropping them before they get to the Kafka connect framework. As such, Kafka connect never sees these messages, and doesn't commit any progress. This results in several problems: # In the event of a failure, the connector could fall WAY back, since the last committed offset might be from hours ago, even thought it *has* processed all recent messages--it just hasn't sent anything to Kafka. # For connectors like Debezium that consume from a source that has a *limited* window to fetch messages (MySQL's binlog has time/size based retention), you can actually fall off the edge of the binlog because the last commit can actually happen farther back than the binlog goes, even though Debezium has fetched every single message in the binlog--it just hasn't produced anything due to filtering. Again, I don't see this as a Debezium-specific issue. I could imagine a similar scenario with an [SST-based Cassandra source|https://github.com/datamountaineer/stream-reactor/issues/162]. It would be nice if Kafka connect allowed us a way to commit offsets for source tasks even when messages haven't been sent recently. This would allow source tasks to log their progress even if they're opting not to send messages to Kafka due to filtering or for some other reason. (See https://issues.jboss.org/browse/DBZ-220 for more context.) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-2376) Add Copycat metrics
[ https://issues.apache.org/jira/browse/KAFKA-2376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851926#comment-15851926 ] Chris Riccomini commented on KAFKA-2376: One thing we'd like to see here is the ability to hook into the Kafka Connect metrics from our connectors. That way we can add connector-specific metrics, and have them flow through the same pipeline. > Add Copycat metrics > --- > > Key: KAFKA-2376 > URL: https://issues.apache.org/jira/browse/KAFKA-2376 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: Ewen Cheslack-Postava > Labels: needs-kip > > Copycat needs good metrics for monitoring since that will be the primary > insight into the health of connectors as they copy data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657561#comment-15657561 ] Chris Riccomini commented on KAFKA-4107: Agreed. We've had this need as well, and have been resorting to renaming the connector every time. It creates a ton of garbage in the commit topic, though. > Support offset reset capability in Kafka Connect > > > Key: KAFKA-4107 > URL: https://issues.apache.org/jira/browse/KAFKA-4107 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jason Gustafson >Assignee: Ewen Cheslack-Postava > > It would be useful in some cases to be able to reset connector offsets. For > example, if a topic in Kafka corresponding to a source database is > accidentally deleted (or deleted because of corrupt data), an administrator > may want to reset offsets and reproduce the log from the beginning. It may > also be useful to have support for overriding offsets, but that seems like a > less likely use case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4171) Kafka-connect prints outs keystone and truststore password in log2
[ https://issues.apache.org/jira/browse/KAFKA-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15491164#comment-15491164 ] Chris Riccomini commented on KAFKA-4171: Wondering if KC is just putting configuration from consumer.* into the config without the consumer.*, but leaving the config with the consumer. prefix floating around still? Seems like the prefix should be stripped when forwarding config on to the underlying Kafka consumer. > Kafka-connect prints outs keystone and truststore password in log2 > -- > > Key: KAFKA-4171 > URL: https://issues.apache.org/jira/browse/KAFKA-4171 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Akshath Patkar >Assignee: Ewen Cheslack-Postava > > Kafka-connect prints outs keystone and truststore password in log > [2016-09-14 16:30:33,971] WARN The configuration > consumer.ssl.truststore.password = X was supplied but isn't a known > config. (org.apache.kafka.clients.consumer.ConsumerConfig:186) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs
Chris Riccomini created KAFKA-2020: -- Summary: I expect ReplicaNotAvailableException to have proper Javadocs Key: KAFKA-2020 URL: https://issues.apache.org/jira/browse/KAFKA-2020 Project: Kafka Issue Type: Bug Components: consumer Reporter: Chris Riccomini Assignee: Neha Narkhede It looks like ReplicaNotAvailableException was copy and pasted from LeaderNotAvailable exception. The Javadocs were never changed. This means that users think that ReplicaNotAvailableException signifies leaders are not available. This is very different from, I can ignore this exception, which is what the Kafka protocol docs say to do with ReplicaNotAvailableException. Related: what's the point of ReplicaNotAvailableException if it's supposed to be ignored? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException
[ https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14286326#comment-14286326 ] Chris Riccomini commented on KAFKA-1886: IMO, the SimpleConsumer should at least throw the proper exception. SimpleConsumer swallowing ClosedByInterruptException Key: KAFKA-1886 URL: https://issues.apache.org/jira/browse/KAFKA-1886 Project: Kafka Issue Type: Bug Components: producer Reporter: Aditya A Auradkar Assignee: Jun Rao This issue was originally reported by a Samza developer. I've included an exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on my dev setup. From: criccomi Hey all, Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches Throwable in its sendRequest method [2]. I'm wondering: if blockingChannel.send/receive throws a ClosedByInterruptException when the thread is interrupted, what happens? It looks like sendRequest will catch the exception (which I think clears the thread's interrupted flag), and then retries the send. If the send succeeds on the retry, I think that the ClosedByInterruptException exception is effectively swallowed, and the BrokerProxy will continue fetching messages as though its thread was never interrupted. Am I misunderstanding how things work? Cheers, Chris [1] https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126 [2] https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1863) Exception categories / hierarchy in clients
[ https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973 ] Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:30 AM: -- Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka producer to send messages. We batch inside Samza's KafkaSystemProducer, so calling KafkaSystemProducer.send() prepends your message to a buffer inside the KafkaSystemProducer (we don't send it to the underlying Kafka producer at this point). When the buffer reaches some max (say, 200), the send() call will trigger a sync KafkaSystemProducer.flush() call, which will call send() on the underlying producer. Since the send() call is blocking, any exception that occurs happens on the thread that's calling send(), so we just simply wrap the send() call in a try/catch. Flush is also called before Samza checkpoints its offsets, to make sure that any outstanding output is sent (so we don't lose data if there's a failure). Samza's KafkaSystemProducer is currently setup to retry forever, no matter what, when sends to Kafka fail. It waits 10s, and retries. This behavior has come in handy in scenarios where we have (un)expected downtime or cluster maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza jobs to kill themselves. We just want them to chill out until the grid's back. This strategy has some drawbacks, though. # The send is blocking. This hurts throughput. Sometimes, quite severely. # Some exceptions are never going to get better. Retrying doesn't make sense in this case. When we upgrade to the new Producer, we want to address these issues. For (1), the new producer everything is async, so that gets fixed automatically. For (2), we'd like to only retry with RetriableException, and fail for the rest. The trick is, if the sends are no longer sync, we have to have a mechanism by which we can verify that all outstanding messages have been successfully sent before we return. {code} send() send() send() flush() // when this returns, all messages must be in Kafka.. inflight must be 0 and buffer must be 0 {code} The naive approach would be to just hold on to a future for each send() call, and stuff it in a list. When flush is invoked, simply iterate over the list, and call Future.get(). The confusing thing with this approach is what to do if Future.get() tells us that there was an exception when the message send was attempted. It seems that re-sending the message at this point would result in out of order messages. Is that correct? {code} send() // success send() // failed send() // enqueued flush() {code} In the above example, if we call Future.get() on the second send()'s future, and we see it's a failure, the third send() could have already happened. In this case, if we re-send the failed message, then we get the third message before the second, right? Another oddity of this approach is that it seems we would have to double-buffer the messages in order to resend them, since RecordMetadata doesn't have the actual message that we sent. In order to re-send, we'd have to keep the message in a buffer in KafkaSystemProducer, along with its future, until the Future.get() returns successfully. We then thought that we would set: {noformat} retries=20 {noformat} At this point, we thought that the producer would re-send forever from the send thread, which is pretty much what we wanted in the case of a failure. But then we realized that if we had multiple in-flight requests, this could lead to out of order messages, so we forced: {noformat} max.in.flight.requests.per.connection=1 {noformat} So at this point, we believe that any message that gets into the send thread will be guaranteed to be sent, and in the right order, always. But, talking with [~guozhang], it sounds like there are some cases where ApiExceptions are thrown from the main thread, not the send thread. In this case, it seems that our {{retries}} setting has no effect, because the message hasn't even made it into the queue for the send thread yet. So, if an exception is thrown in the main thread, we seem to have to catch the RetriableException that's thrown, and re-send the message. This is the current implementation that [~navina] is working on. Part of me is thinking, This can't be right. We must be misunderstanding something. So, I 1) want to confirm that we're not misunderstanding anything, and 2) see if there is a better way to accomplish what we want: # Async sends. # A flush mechanism that blocks until all messages have been sent to Kafka. # In-order retries when a RetriableFailure occurs. # Forward a non-RetriableException when it occurs. I'm pretty confident that my mental model is broken, so any help correcting it would be appreciated. Also, any tips on a better way to accomplish
[jira] [Comment Edited] (KAFKA-1863) Exception categories / hierarchy in clients
[ https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973 ] Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:38 AM: -- Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka producer to send messages. We batch inside Samza's KafkaSystemProducer, so calling KafkaSystemProducer.send() prepends your message to a buffer inside the KafkaSystemProducer (we don't send it to the underlying Kafka producer at this point). When the buffer reaches some max (say, 200), the send() call will trigger a sync KafkaSystemProducer.flush() call, which will call send() on the underlying producer. Since the underlying kafka producer's send() call is blocking, any exception that occurs happens on the thread that's calling send(), so we just simply wrap the send() call in a try/catch. Flush is also called before Samza checkpoints its offsets, to make sure that any outstanding output is sent (so we don't lose data if there's a failure). Samza's KafkaSystemProducer is currently setup to retry forever, no matter what, when sends to Kafka fail. It waits 10s, and retries. This behavior has come in handy in scenarios where we have (un)expected downtime or cluster maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza jobs to kill themselves. We just want them to chill out until the grid's back. This strategy has some drawbacks, though. # The send is blocking. This hurts throughput. Sometimes, quite severely. # Some exceptions are never going to get better. Retrying doesn't make sense in this case. When we upgrade to the new Producer, we want to address these issues. For (1), the new producer everything is async, so that gets fixed automatically. For (2), we'd like to only retry with RetriableException, and fail for the rest. The trick is, if the sends are no longer sync, we have to have a mechanism by which we can verify that all outstanding messages have been successfully sent before we return. {code} send() send() send() flush() // when this returns, all messages must be in Kafka.. inflight must be 0 and buffer must be 0 {code} The naive approach would be to just hold on to a future for each send() call, and stuff it in a list. When flush is invoked, simply iterate over the list, and call Future.get(). The confusing thing with this approach is what to do if Future.get() tells us that there was an exception when the message send was attempted. It seems that re-sending the message at this point would result in out of order messages. Is that correct? {code} send() // success send() // failed send() // enqueued flush() {code} In the above example, if we call Future.get() on the second send()'s future, and we see it's a failure, the third send() could have already happened. In this case, if we re-send the failed message, then we get the third message before the second, right? Another oddity of this approach is that it seems we would have to double-buffer the messages in order to resend them, since RecordMetadata doesn't have the actual message that we sent. In order to re-send, we'd have to keep the message in a buffer in KafkaSystemProducer, along with its future, until the Future.get() returns successfully. We then thought that we would set: {noformat} retries=20 {noformat} At this point, we thought that the producer would re-send forever from the send thread, which is pretty much what we wanted in the case of a failure. But then we realized that if we had multiple in-flight requests, this could lead to out of order messages, so we forced: {noformat} max.in.flight.requests.per.connection=1 {noformat} So at this point, we believe that any message that gets into the send thread will be guaranteed to be sent, and in the right order, always. But, talking with [~guozhang], it sounds like there are some cases where ApiExceptions are thrown from the main thread, not the send thread. In this case, it seems that our {{retries}} setting has no effect, because the message hasn't even made it into the queue for the send thread yet. So, if an exception is thrown in the main thread, we seem to have to catch the RetriableException that's thrown, and re-send the message. This is the current implementation that [~navina] is working on. Part of me is thinking, This can't be right. We must be misunderstanding something. So, I 1) want to confirm that we're not misunderstanding anything, and 2) see if there is a better way to accomplish what we want: # Async sends. # A flush mechanism that blocks until all messages have been sent to Kafka. # In-order retries when a RetriableFailure occurs. # Forward a non-RetriableException when it occurs. I'm pretty confident that my mental model is broken, so any help correcting it would be appreciated. Also, any tips on a
[jira] [Comment Edited] (KAFKA-1863) Exception categories / hierarchy in clients
[ https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973 ] Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:39 AM: -- Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka producer to send messages. We batch inside Samza's KafkaSystemProducer, so calling KafkaSystemProducer.send() prepends your message to a buffer inside the KafkaSystemProducer (we don't send it to the underlying Kafka producer at this point). When the buffer reaches some max (say, 200), the send() call will trigger a sync KafkaSystemProducer.flush() call, which will call send() on the underlying producer. Since the underlying kafka producer's send() call is blocking, any exception that occurs happens on the thread that's calling send(), so we just simply wrap the send() call in a try/catch. Flush is also called before Samza checkpoints its offsets, to make sure that any outstanding output is sent (so we don't lose data if there's a failure). Samza's KafkaSystemProducer is currently setup to retry forever, no matter what, when sends to Kafka fail. It waits 10s, and retries. This behavior has come in handy in scenarios where we have (un)expected downtime or cluster maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza jobs to kill themselves. We just want them to chill out until the grid's back. This strategy has some drawbacks, though. # The KafkaSystemProducer.flush() call is blocking. This hurts throughput. Sometimes, quite severely. # Some exceptions are never going to get better. Retrying doesn't make sense in this case. When we upgrade to the new Producer, we want to address these issues. For (1), the new producer everything is async, so that gets fixed automatically. For (2), we'd like to only retry with RetriableException, and fail for the rest. The trick is, if the sends are no longer sync, we have to have a mechanism by which we can verify that all outstanding messages have been successfully sent before we return. {code} send() send() send() flush() // when this returns, all messages must be in Kafka.. inflight must be 0 and buffer must be 0 {code} The naive approach would be to just hold on to a future for each send() call, and stuff it in a list. When flush is invoked, simply iterate over the list, and call Future.get(). The confusing thing with this approach is what to do if Future.get() tells us that there was an exception when the message send was attempted. It seems that re-sending the message at this point would result in out of order messages. Is that correct? {code} send() // success send() // failed send() // enqueued flush() {code} In the above example, if we call Future.get() on the second send()'s future, and we see it's a failure, the third send() could have already happened. In this case, if we re-send the failed message, then we get the third message before the second, right? Another oddity of this approach is that it seems we would have to double-buffer the messages in order to resend them, since RecordMetadata doesn't have the actual message that we sent. In order to re-send, we'd have to keep the message in a buffer in KafkaSystemProducer, along with its future, until the Future.get() returns successfully. We then thought that we would set: {noformat} retries=20 {noformat} At this point, we thought that the producer would re-send forever from the send thread, which is pretty much what we wanted in the case of a failure. But then we realized that if we had multiple in-flight requests, this could lead to out of order messages, so we forced: {noformat} max.in.flight.requests.per.connection=1 {noformat} So at this point, we believe that any message that gets into the send thread will be guaranteed to be sent, and in the right order, always. But, talking with [~guozhang], it sounds like there are some cases where ApiExceptions are thrown from the main thread, not the send thread. In this case, it seems that our {{retries}} setting has no effect, because the message hasn't even made it into the queue for the send thread yet. So, if an exception is thrown in the main thread, we seem to have to catch the RetriableException that's thrown, and re-send the message. This is the current implementation that [~navina] is working on. Part of me is thinking, This can't be right. We must be misunderstanding something. So, I 1) want to confirm that we're not misunderstanding anything, and 2) see if there is a better way to accomplish what we want: # Async sends. # A flush mechanism that blocks until all messages have been sent to Kafka. # In-order retries when a RetriableFailure occurs. # Forward a non-RetriableException when it occurs. I'm pretty confident that my mental model is broken, so any help correcting it would be
[jira] [Commented] (KAFKA-1863) Exception categories / hierarchy in clients
[ https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14277973#comment-14277973 ] Chris Riccomini commented on KAFKA-1863: Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka producer to send messages. We batch inside Samza's KafkaSystemProducer, so calling KafkaSystemProducer.send() prepends your message to a buffer inside the KafkaSystemProducer (we don't send it to the underlying Kafka producer at this point). When the buffer reaches some max (say, 200), the send() call will trigger a sync KafkaSystemProducer.flush() call, which will call send() on the underlying producer. Since the send() call is blocking, any exception that occurs happens on the thread that's calling send(), so we just simply wrap the send() call in a try/catch. Flush is also called before Samza checkpoints its offsets, to make sure that any outstanding output is sent (so we don't lose data if there's a failure). Samza's KafkaSystemProducer is currently setup to retry forever, no matter what, when sends to Kafka fail. It waits 10s, and retries. This behavior has come in handy in scenarios where we have (un)expected downtime or cluster maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza jobs to kill themselves. We just want them to chill out until the grid's back. This strategy has some drawbacks, though. # The send is blocking. This hurts throughput. Sometimes, quite severely. # Some exceptions are never going to get better. Retrying doesn't make sense in this case. When we upgrade to the new Producer, we want to address these issues. For (1), the new producer everything is async, so that gets fixed automatically. For (2), we'd like to only retry with RetriableException, and fail for the rest. The trick is, if the sends are no longer sync, we have to have a mechanism by which we can verify that all outstanding messages have been successfully sent before we return. {code} send() send() send() flush() // when this returns, all messages must be in Kafka.. inflight must be 0 and buffer must be 0 {code} The naive approach would be to just hold on to a future for each send() call, and stuff it in a list. When flush is invoked, simply iterate over the list, and call Future.get(). The confusing thing with this approach is what to do if Future.get() tells us that there was an exception when the message send was attempted. It seems that re-sending the message at this point would result in out of order messages. Is that correct? {code} send() // success send() // failed send() // enqueued flush() {code} In the above example, if we call Future.get() on the second send()'s future, and we see it's a failure, the third send() could have already happened. In this case, if we re-send the failed message, then we get the third message before the second, right? Another oddity of this approach is that it seems we would have to double-buffer the messages in order to resend them, since RecordMetadata doesn't have the actual message that we sent. In order to re-send, we'd have to keep the message in a buffer in KafkaSystemProducer, along with its future, until the Future.get() returns successfully. We then thought that we would set: {noformat} retries=20 {noformat} At this point, we thought that the producer would re-send forever from the send thread, which is pretty much what we wanted in the case of a failure. But then we realized that if we had multiple in-flight requests, this could lead to out of order messages, so we forced: {noformat} max.in.flight.requests.per.connection=1 {noformat} So at this point, we believe that any message that gets into the send thread will be guaranteed to be sent, and in the right order, always. But, talking with @guozhang, it sounds like there are some cases where ApiExceptions are thrown from the main thread, not the send thread. In this case, it seems that our {{retries}} setting has no effect, because the message hasn't even made it into the queue for the send thread yet. So, if an exception is thrown in the main thread, we seem to have to catch the RetriableException that's thrown, and re-send the message. This is the current implementation that @navina is working on. Part of me is thinking, This can't be right. We must be misunderstanding something. So, I 1) want to confirm that we're not misunderstanding anything, and 2) see if there is a better way to accomplish what we want: # Async sends. # A flush mechanism that blocks until all messages have been sent to Kafka. # In-order retries when a RetriableFailure occurs. # Forward a non-RetriableException when it occurs. I'm pretty confident that my mental model is broken, so any help correcting it would be appreciated. Also, any tips on a better way to accomplish what we want would be appreciated. Exception
[jira] [Commented] (KAFKA-1863) Exception categories / hierarchy in clients
[ https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14278107#comment-14278107 ] Chris Riccomini commented on KAFKA-1863: bq. It might be nice to add a flush() call that waits for all the currently buffered or in-flight requests to complete assuming this doesn't add inefficiency in the normal case. +1 This would be very helpful. Exception categories / hierarchy in clients --- Key: KAFKA-1863 URL: https://issues.apache.org/jira/browse/KAFKA-1863 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 In the new clients package we introduces a new set of exceptions, but its hierarchy is not very clear as of today: {code} RuntimeException - KafkaException - BufferExhastedException - ConfigException - SerializationException - QuotaViolationException - SchemaException - ApiException ApiException - InvalidTopicException - OffsetMetadataTooLarge (probabaly need to be renamed) - RecordBatchTooLargeException - RecordTooLargeException - UnknownServerException - RetriableException RetriableException - CorruptRecordException - InvalidMetadataException - NotEnoughtReplicasAfterAppendException - NotEnoughReplicasException - OffsetOutOfRangeException - TimeoutException - UnknownTopicOrPartitionException {code} KafkaProducer.send() may throw KafkaExceptions that are not ApiExceptions; other exceptions will be set in the returned future metadata. We need better to 1. Re-examine the hierarchy. For example, for producers only exceptions that are thrown directly from the caller thread before it is appended to the batch buffer should be ApiExceptions; some exceptions could be renamed / merged. 2. Clearly document the exception category / hierarchy as part of the release. [~criccomini] may have some more feedbacks for this issue from Samza's usage experience. [~jkreps] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1863) Exception categories / hierarchy in clients
[ https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14278113#comment-14278113 ] Chris Riccomini commented on KAFKA-1863: [~jkreps], so it sounds like we can fail our containers on all exceptions that we see, and rely on retries=20 to handle all RetriableExceptions, correct? Exception categories / hierarchy in clients --- Key: KAFKA-1863 URL: https://issues.apache.org/jira/browse/KAFKA-1863 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 In the new clients package we introduces a new set of exceptions, but its hierarchy is not very clear as of today: {code} RuntimeException - KafkaException - BufferExhastedException - ConfigException - SerializationException - QuotaViolationException - SchemaException - ApiException ApiException - InvalidTopicException - OffsetMetadataTooLarge (probabaly need to be renamed) - RecordBatchTooLargeException - RecordTooLargeException - UnknownServerException - RetriableException RetriableException - CorruptRecordException - InvalidMetadataException - NotEnoughtReplicasAfterAppendException - NotEnoughReplicasException - OffsetOutOfRangeException - TimeoutException - UnknownTopicOrPartitionException {code} KafkaProducer.send() may throw KafkaExceptions that are not ApiExceptions; other exceptions will be set in the returned future metadata. We need better to 1. Re-examine the hierarchy. For example, for producers only exceptions that are thrown directly from the caller thread before it is appended to the batch buffer should be ApiExceptions; some exceptions could be renamed / merged. 2. Clearly document the exception category / hierarchy as part of the release. [~criccomini] may have some more feedbacks for this issue from Samza's usage experience. [~jkreps] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1755) Log cleaner thread should not exit on errors
[ https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198578#comment-14198578 ] Chris Riccomini commented on KAFKA-1755: It might also be desirable to allow the log compaction to continue on the topic in question, and simply keep all messages without keys without doing any compaction on them. Log cleaner thread should not exit on errors Key: KAFKA-1755 URL: https://issues.apache.org/jira/browse/KAFKA-1755 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.3 The log cleaner is a critical process when using compacted topics. However, if there is any error in any topic (notably if a key is missing) then the cleaner exits and all other compacted topics will also be adversely affected - i.e., compaction stops across the board. This can be improved by just aborting compaction for a topic on any error and keep the thread from exiting. Another improvement would be to reject messages without keys that are sent to compacted topics although this is not enough by itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1639) Support control messages in Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14137751#comment-14137751 ] Chris Riccomini commented on KAFKA-1639: One point of discussion is whether the control message should be a full-blown message, or just a field in an existing message's payload. A full message seems like a more general solution to me. Support control messages in Kafka - Key: KAFKA-1639 URL: https://issues.apache.org/jira/browse/KAFKA-1639 Project: Kafka Issue Type: Improvement Reporter: Chris Riccomini The current transactionality proposal (https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka) and implementation use control messages to handle transactions in Kafka. Kafka traditionally hasn't had control messages in its topics. Transactionality (as it's implemented) introduces this pattern, but appears to do so in a very specific fashion (control messages only for transactions). It seems to me that a good approach to control messages would be to generalize the control message model in Kafka to support not just transaction control messages, but arbitrary control messages. On the producer side, arbitrary control messages should be allowed to be sent, and on the consumer side, these control messages should be dropped by default. Just like transactionality, this would let frameworks (e.g. Samza) and other app-specific implementations take advantage of in-topic control messages (as opposed to out of band control messages) without any impact on existing consumers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-560) Garbage Collect obsolete topics
[ https://issues.apache.org/jira/browse/KAFKA-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14128771#comment-14128771 ] Chris Riccomini commented on KAFKA-560: --- bq. It would be good to have a tool that could delete any topic that had not been written to in a configurable period of time and had no active consumer groups. I would prefer not to depend on consumer groups. Samza, for example, doesn't have consumer groups, so doing things like looking at the lsat offset commit of a consumer group in ZK/OffsetManager will not help if the consumer is using Samza (or some other offset checkpoint mechanism). The better approach, to me, seems to be to just have brokers keep track of approximate last-reads for each topic/partition based on FetchRequests. Garbage Collect obsolete topics --- Key: KAFKA-560 URL: https://issues.apache.org/jira/browse/KAFKA-560 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Labels: project Old junk topics tend to accumulate over time. Code may migrate to use new topics leaving the old ones orphaned. Likewise there are some use cases for temporary transient topics. It would be good to have a tool that could delete any topic that had not been written to in a configurable period of time and had no active consumer groups. Something like ./bin/delete-unused-topics.sh --last-write [date] --zookeeper [zk_connect] This requires API support to get the last update time. I think it may be possible to do this through the OffsetRequest now? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1171) Gradle build for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13844763#comment-13844763 ] Chris Riccomini commented on KAFKA-1171: For cross-building Scala versions, have a look at: https://issues.apache.org/jira/browse/SAMZA-34 [~szczepiq] provided the example zip file. Gradle build for Kafka -- Key: KAFKA-1171 URL: https://issues.apache.org/jira/browse/KAFKA-1171 Project: Kafka Issue Type: Improvement Components: packaging Affects Versions: 0.8.1, 0.9.0 Reporter: David Arthur Assignee: David Arthur Attachments: 0001-Adding-basic-Gradle-build.patch We have previously discussed moving away from SBT to an easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted to check out Gradle as well. 1. https://issues.apache.org/jira/browse/KAFKA-855 -- This message was sent by Atlassian JIRA (v6.1.4#6159)
[jira] [Commented] (KAFKA-1171) Gradle build for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13844780#comment-13844780 ] Chris Riccomini commented on KAFKA-1171: For the record, a good set of examples on how to build OSS multi-module Gradle projects is Netflix's GitHub account: https://github.com/Netflix Their account contains a number of projects that use Gradle (Lipstick, astyanax, servo, etc). In addition, they have a stub repo for bootstrapping new projects with Gradle. https://github.com/Netflix/gradle-template Gradle build for Kafka -- Key: KAFKA-1171 URL: https://issues.apache.org/jira/browse/KAFKA-1171 Project: Kafka Issue Type: Improvement Components: packaging Affects Versions: 0.8.1, 0.9.0 Reporter: David Arthur Assignee: David Arthur Attachments: 0001-Adding-basic-Gradle-build.patch We have previously discussed moving away from SBT to an easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted to check out Gradle as well. 1. https://issues.apache.org/jira/browse/KAFKA-855 -- This message was sent by Atlassian JIRA (v6.1.4#6159)
[jira] [Commented] (KAFKA-1171) Gradle build for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13844808#comment-13844808 ] Chris Riccomini commented on KAFKA-1171: That sounds a lot like a version incompatibility between the version of Gradle you're using, and what the project was built against (it includes no gradlew script). I was able to successfully compile it use Samza's gradlew script. Might setup a gradlew in the project, and give that a shot. Gradle build for Kafka -- Key: KAFKA-1171 URL: https://issues.apache.org/jira/browse/KAFKA-1171 Project: Kafka Issue Type: Improvement Components: packaging Affects Versions: 0.8.1, 0.9.0 Reporter: David Arthur Assignee: David Arthur Attachments: 0001-Adding-basic-Gradle-build.patch We have previously discussed moving away from SBT to an easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted to check out Gradle as well. 1. https://issues.apache.org/jira/browse/KAFKA-855 -- This message was sent by Atlassian JIRA (v6.1.4#6159)
[jira] [Commented] (KAFKA-1016) Broker should limit purgatory size
[ https://issues.apache.org/jira/browse/KAFKA-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13755167#comment-13755167 ] Chris Riccomini commented on KAFKA-1016: Another interesting behavior seen when FetchRequest is made with Int.MAX. You start seeing high request purgatory count, AND you start seeing very high CLOSED_WAIT count on sockets on the broker. The reason for this appears to be that a connection can die or time out, but the connection is never closed because it's just sitting in purgatory for Int.MAX milliseconds. If no new messages appear for the topic the consumer is reading from (due to an upstream producer being down, for instance), then the connection never gets closed. Broker should limit purgatory size -- Key: KAFKA-1016 URL: https://issues.apache.org/jira/browse/KAFKA-1016 Project: Kafka Issue Type: Bug Components: purgatory Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Joel Koshy I recently ran into a case where a poorly configured Kafka consumer was able to trigger out of memory exceptions in multiple Kafka brokers. The consumer was configured to have a fetcher.max.wait of Int.MaxInt. For low volume topics, this configuration causes the consumer to block for frequently, and for long periods of time. [~junrao] informs me that the fetch request will time out after the socket timeout is reached. In our case, this was set to 30s. With several thousand consumer threads, the fetch request purgatory got into the 100,000-400,000 range, which we believe triggered the out of memory exception. [~nehanarkhede] claims to have seem similar behavior in other high volume clusters. It kind of seems like a bad thing that a poorly configured consumer can trigger out of memory exceptions in the broker. I was thinking maybe it makes sense to have the broker try and protect itself from this situation. Here are some potential solutions: 1. Have a broker-side max wait config for fetch requests. 2. Threshold the purgatory size, and either drop the oldest connections in purgatory, or reject the newest fetch requests when purgatory is full. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it
[ https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750328#comment-13750328 ] Chris Riccomini commented on KAFKA-1012: Regarding transactionality, as described in https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management, how do you guarantee that the offset messages in a single transaction will be sequential with no gaps? As far as I understand, your assumption is that if a transaction has 5 messages, they will be materialized into the topic/partition as M1, M2, M3, M4, M5, with no messages in between. Is there a single-writer that's making this guarantee? I don't see how this is possible if you allow clients to send produce requests directly to the topic. Without a single writer, you could end up with M1, M2, .. other offset messages .., M3, M4, M5. In such a case, how do you determine if the transaction has failed? Is there some timeout (If I don't get all 5 messages within 60s, the transaction is rolled back)? Implement an Offset Manager and hook offset requests to it -- Key: KAFKA-1012 URL: https://issues.apache.org/jira/browse/KAFKA-1012 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Tejas Patil Assignee: Tejas Patil Priority: Minor Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch After KAFKA-657, we have a protocol for consumers to commit and fetch offsets from brokers. Currently, consumers are not using this API and directly talking with Zookeeper. This Jira will involve following: 1. Add a special topic in kafka for storing offsets 2. Add an OffsetManager interface which would handle storing, accessing, loading and maintaining consumer offsets 3. Implement offset managers for both of these 2 choices : existing ZK based storage or inbuilt storage for offsets. 4. Leader brokers would now maintain an additional hash table of offsets for the group-topic-partitions that they lead 5. Consumers should now use the OffsetCommit and OffsetFetch API -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it
[ https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750345#comment-13750345 ] Chris Riccomini commented on KAFKA-1012: From wiki: It will likely have a tight size limit to avoid server impact. Does Kafka support per-topic max message size? Without this, is it possible to enforce size limits on the offset topic? If writes are happening through the offset commit API it should be do-able, but if writes are sent as regular produce request, the broker would have to special case the offset topic to enforce size limits. Implement an Offset Manager and hook offset requests to it -- Key: KAFKA-1012 URL: https://issues.apache.org/jira/browse/KAFKA-1012 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Tejas Patil Assignee: Tejas Patil Priority: Minor Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch After KAFKA-657, we have a protocol for consumers to commit and fetch offsets from brokers. Currently, consumers are not using this API and directly talking with Zookeeper. This Jira will involve following: 1. Add a special topic in kafka for storing offsets 2. Add an OffsetManager interface which would handle storing, accessing, loading and maintaining consumer offsets 3. Implement offset managers for both of these 2 choices : existing ZK based storage or inbuilt storage for offsets. 4. Leader brokers would now maintain an additional hash table of offsets for the group-topic-partitions that they lead 5. Consumers should now use the OffsetCommit and OffsetFetch API -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it
[ https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750351#comment-13750351 ] Chris Riccomini commented on KAFKA-1012: Also, what's the guidance on partitioning the offset topic. It almost seems like the best solution is to have one partition per broker, so that each broker can load share (both writes/sec and memory requirement to keep the map in memory). Implement an Offset Manager and hook offset requests to it -- Key: KAFKA-1012 URL: https://issues.apache.org/jira/browse/KAFKA-1012 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Tejas Patil Assignee: Tejas Patil Priority: Minor Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch After KAFKA-657, we have a protocol for consumers to commit and fetch offsets from brokers. Currently, consumers are not using this API and directly talking with Zookeeper. This Jira will involve following: 1. Add a special topic in kafka for storing offsets 2. Add an OffsetManager interface which would handle storing, accessing, loading and maintaining consumer offsets 3. Implement offset managers for both of these 2 choices : existing ZK based storage or inbuilt storage for offsets. 4. Leader brokers would now maintain an additional hash table of offsets for the group-topic-partitions that they lead 5. Consumers should now use the OffsetCommit and OffsetFetch API -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-1016) Broker should limit purgatory size
Chris Riccomini created KAFKA-1016: -- Summary: Broker should limit purgatory size Key: KAFKA-1016 URL: https://issues.apache.org/jira/browse/KAFKA-1016 Project: Kafka Issue Type: Bug Components: purgatory Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Joel Koshy I recently ran into a case where a poorly configured Kafka consumer was able to trigger out of memory exceptions in multiple Kafka brokers. The consumer was configured to have a fetcher.max.wait of Int.MaxInt. For low volume topics, this configuration causes the consumer to block for frequently, and for long periods of time. [~junrao] informs me that the fetch request will time out after the socket timeout is reached. In our case, this was set to 30s. With several thousand consumer threads, the fetch request purgatory got into the 100,000-400,000 range, which we believe triggered the out of memory exception. [~nehanarkhede] claims to have seem similar behavior in other high volume clusters. It kind of seems like a bad thing that a poorly configured consumer can trigger out of memory exceptions in the broker. I was thinking maybe it makes sense to have the broker try and protect itself from this situation. Here are some potential solutions: 1. Have a broker-side max wait config for fetch requests. 2. Threshold the purgatory size, and either drop the oldest connections in purgatory, or reject the newest fetch requests when purgatory is full. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-982) Logo for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13716567#comment-13716567 ] Chris Riccomini commented on KAFKA-982: --- +1 301 Logo for Kafka -- Key: KAFKA-982 URL: https://issues.apache.org/jira/browse/KAFKA-982 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Attachments: 289.jpeg, 294.jpeg, 296.png, 298.jpeg, 301.png We should have a logo for kafka. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-925) Add optional partition key override in producer
[ https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13711676#comment-13711676 ] Chris Riccomini commented on KAFKA-925: --- Hey Jay, Seems pretty reasonable to me. Is the reason for the type change in the Partitioner so that you can handle either keys of type K (key) or keys of any type (part key) using the same partitioner? Cheers, Chris Add optional partition key override in producer --- Key: KAFKA-925 URL: https://issues.apache.org/jira/browse/KAFKA-925 Project: Kafka Issue Type: New Feature Components: producer Affects Versions: 0.8.1 Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-925-v1.patch, KAFKA-925-v2.patch We have a key that is used for partitioning in the producer and stored with the message. Actually these uses, though often the same, could be different. The two meanings are effectively: 1. Assignment to a partition 2. Deduplication within a partition In cases where we want to allow the client to take advantage of both of these and they aren't the same it would be nice to allow them to be specified separately. To implement this I added an optional partition key to KeyedMessage. When specified this key is used for partitioning rather than the message key. This key is of type Any and the parametric typing is removed from the partitioner to allow it to work with either key. An alternative would be to allow the partition id to specified in the KeyedMessage. This would be slightly more convenient in the case where there is no partition key but instead you know a priori the partition number--this case must be handled by giving the partition id as the partition key and using an identity partitioner which is slightly more roundabout. However this is inconsistent with the normal partitioning which requires a key in the case where the partition is determined by a key--in that case you would be manually calling your partitioner in user code. It seems best to me to either use a key or always a partition and since we currently take a key I stuck with that. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-725) Broker Exception: Attempt to read with a maximum offset less than start offset
Chris Riccomini created KAFKA-725: - Summary: Broker Exception: Attempt to read with a maximum offset less than start offset Key: KAFKA-725 URL: https://issues.apache.org/jira/browse/KAFKA-725 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Jay Kreps I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically: 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152) java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732). at kafka.log.LogSegment.read(LogSegment.scala:105) at kafka.log.Log.read(Log.scala:390) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165) at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164) at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186) at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) When I shut the consumer down, I don't see the exceptions anymore. This is the code that my consumer is running: while(true) { // we believe the consumer to be connected, so try and use it for a fetch request val request = new FetchRequestBuilder() .addFetch(topic, partition, nextOffset, fetchSize) .maxWait(Int.MaxValue) // TODO for super high-throughput, might be worth waiting for more bytes .minBytes(1) .build debug(Fetching messages for stream %s and offset %s. format (streamPartition, nextOffset)) val messages = connectedConsumer.fetch(request) debug(Fetch complete for stream %s and offset %s. Got messages: %s format (streamPartition, nextOffset, messages)) if (messages.hasError) { warn(Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect. format (streamPartition, messages.errorCode(topic, partition))) ErrorMapping.maybeThrowException(messages.errorCode(topic, partition)) } messages.messageSet(topic, partition).foreach(msg = { watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload)) nextOffset = msg.nextOffset }) } Any idea what might be causing this error? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-715) NumberFormatException in PartitionStateInfo
Chris Riccomini created KAFKA-715: - Summary: NumberFormatException in PartitionStateInfo Key: KAFKA-715 URL: https://issues.apache.org/jira/browse/KAFKA-715 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Neha Narkhede Hey Guys, During a broker restart, I got this exception: 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:host.name=eat1-qa466.corp.linkedin.com 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.version=1.6.0_21 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.vendor=Sun Microsystems Inc. 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.home=/export/apps/jdk/JDK-1_6_0_21/jre 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.class.path=/export/apps/jdk/JDK-1_6_0_21/lib/tools.jar:lib/activation-1.0.2.jar:lib/ant-1.6.5.jar:lib/aopalliance-1.0.jar:lib/cfg-2.8.0.jar:lib/cfg-api-6.6.6.jar:lib/cfg-impl-6.6.6.jar:lib/com.linkedin.customlibrary.j2ee-1.0.jar:lib/com.linkedin.customlibrary.mx4j-3.0.2.jar:lib/com.linkedin.customlibrary.xmsg-0.6.jar:lib/commons-beanutils-1.7.0.jar:lib/commons-cli-1.0.jar:lib/commons-lang-2.4.jar:lib/commons-logging-1.1.jar:lib/configuration-api-1.4.8.jar:lib/configuration-repository-impl-1.4.8.jar:lib/container-management-impl-1.1.15.jar:lib/container-server-1.1.15.jar:lib/emweb-impl-1.1.15.jar:lib/jaxen-1.1.1.jar:lib/jdom-1.0.jar:lib/jetty-6.1.26.jar:lib/jetty-management-6.1.26.jar:lib/jetty-naming-6.1.26.jar:lib/jetty-plus-6.1.26.jar:lib/jetty-util5-6.1.26.jar:lib/jetty-util-6.1.26.jar:lib/jmx-impl-1.4.8.jar:lib/json-simple-1.1.jar:lib/jsp-2.1-6.1.1.jar:lib/jsp-api-2.1-6.1.1.jar:lib/lispring-lispring-core-1.4.8.jar:lib/lispring-lispring-servlet-1.4.8.jar:lib/log4j-1.2.15.jar:lib/mail-1.3.0.jar:lib/mx4j-tools-3.0.2.jar:lib/servlet-api-2.5.jar:lib/spring-aop-3.0.3.jar:lib/spring-asm-3.0.3.jar:lib/spring-aspects-3.0.3.jar:lib/spring-beans-3.0.3.jar:lib/spring-context-3.0.3.jar:lib/spring-context-support-3.0.3.jar:lib/spring-core-3.0.3.jar:lib/spring-expression-3.0.3.jar:lib/spring-jdbc-3.0.3.jar:lib/spring-jms-3.0.3.jar:lib/spring-orm-3.0.3.jar:lib/spring-transaction-3.0.3.jar:lib/spring-web-3.0.3.jar:lib/spring-web-servlet-3.0.3.jar:lib/util-core-4.0.40.jar:lib/util-i18n-4.0.40.jar:lib/util-jmx-4.0.22.jar:lib/util-log-4.0.40.jar:lib/util-servlet-4.0.40.jar:lib/util-xmsg-4.0.40.jar:lib/xml-apis-1.3.04.jar 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.library.path=/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64/server:/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64:/export/apps/jdk/JDK-1_6_0_21/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.io.tmpdir=/export/content/glu/apps/kafka/i001/tmp 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.compiler=NA 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:os.name=Linux 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:os.arch=amd64 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:os.version=2.6.32-220.13.1.el6.x86_64 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:user.name=app 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:user.home=/home/app 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:user.dir=/export/content/glu/apps/kafka/i001 2013/01/21 19:21:10.919 INFO [ZooKeeper] [main] [kafka] [] Initiating client connection, connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1bfdbab5 2013/01/21 19:21:10.932 INFO [ClientCnxn] [main-SendThread()] [kafka] [] Opening socket connection to server eat1-app313.corp.linkedin.com/172.20.72.73:12913 2013/01/21 19:21:10.933 INFO [ClientCnxn] [main-SendThread(eat1-app313.corp.linkedin.com:12913)] [kafka] [] Socket connection established to eat1-app313.corp.linkedin.com/172.20.72.73:12913, initiating session 2013/01/21 19:21:10.963 INFO [ClientCnxn] [main-SendThread(eat1-app313.corp.linkedin.com:12913)] [kafka] [] Session establishment complete on server eat1-app313.corp.linkedin.com/172.20.72.73:12913, sessionid = 0x53afd073784059c, negotiated timeout = 6000 2013/01/21 19:21:10.964 INFO
[jira] [Created] (KAFKA-709) Default queue.enqueue.timeout.ms to -1
Chris Riccomini created KAFKA-709: - Summary: Default queue.enqueue.timeout.ms to -1 Key: KAFKA-709 URL: https://issues.apache.org/jira/browse/KAFKA-709 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Jun Rao Hey Guys, It seems that, by default, producers in 0.8 are async, and have a default queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster than they're producing them will likely end up eventually hitting this exception: Exception in thread Thread-3 kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(PageViewEventByGroupJson,Missing Page Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125]) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111) at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.producer.Producer.asyncSend(Producer.scala:89) at kafka.producer.Producer.send(Producer.scala:77) As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can result in losing messages, and nasty exceptions in the logs. I think the better default is setting queue.enqueue.timeout.ms to -1, which will just block until the queue frees up. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira