[jira] [Created] (KAFKA-8949) MockConsumer.assign and ConsumerRebalanceListener
David Hay created KAFKA-8949: Summary: MockConsumer.assign and ConsumerRebalanceListener Key: KAFKA-8949 URL: https://issues.apache.org/jira/browse/KAFKA-8949 Project: Kafka Issue Type: Bug Reporter: David Hay Using MockConsumer, one must either use {{assign}} or {{subscribe}}, not both. However, if you decide to use the {{assign}} method, there is not an easy way to register a ConsumerPartitionListener that will get called when {{assign}} is called. The current work around is to do the following: {code:java} MockConsumer consumer = new MockConsumer(); consumer.subscribe(Collections.singletonList("test.topic"), listener); consumer.unsubscribe(); consumer.assign(Arrays.asList(new TopicPartition("test.topic", 1), ...)){code} Would be nice to do something like: {code:java} MockConsumer consumer = new MockConsumer(); consumer.registerRebalanceListener(); consumer.assign(...){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-6388) Error while trying to roll a segment that already exists
David Hay created KAFKA-6388: Summary: Error while trying to roll a segment that already exists Key: KAFKA-6388 URL: https://issues.apache.org/jira/browse/KAFKA-6388 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: David Hay Assignee: Neha Narkhede Priority: Blocker I tried setting up a 5 broker 0.8 cluster and sending messages to 100s of topics on it. For a couple of topic partitions, the produce requests never succeed since they fail on the leader with the following error - [2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment file 000 0.log already exists; deleting it first (kafka.log.Log) [2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment file 000 0.index already exists; deleting it first (kafka.log.Log) [2012-12-05 22:54:05,715] ERROR [ReplicaFetcherThread-1-0-on-broker-2], Error due to (kafka.server.R eplicaFetcherThread) kafka.common.KafkaException: Trying to roll a new log segment for topic partition NusWriteEvent-4 with start offset 0 while it already exsits at kafka.log.Log.rollToOffset(Log.scala:456) at kafka.log.Log.roll(Log.scala:434) at kafka.log.Log.maybeRoll(Log.scala:423) at kafka.log.Log.append(Log.scala:257) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:125) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:108) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:108) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629874#comment-14629874 ] David Hay commented on KAFKA-1835: -- If I understand correctly, the proposal in KIP-19 is to add some additional timeout parameters around sending messages (e.g. the amount of time a message can sit in the accumulator waiting for a broker to send to) as well as the max time to block waiting for things like broker metadata, etc. While this is closer, I would still feel obligated to wrap the send() request with my own threading, as I want to give the send request the best opportunity to succeed but not delay the users's request. That is, I wouldn't want to set the max.block.ms to 0, as I'd want to give the producer the opportunity to get the broker metadata, if needed, and not fail right away. I just want this request to run in a separate thread and then propagate the TimeoutException to the Callback provided to the send() method. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628656#comment-14628656 ] David Hay commented on KAFKA-1835: -- With this example, we would still be required to wrap send() with our own threading code to make it completely asynchronous. Since send() returns a Future, there is an implicit assumption that it will never block for anything. In our case, we want the call to send() to be completely asynchronous so that we don't impact end user performance. We deal with the exception in a reactive manner, using the callback variant of send(). As long as send() has the potential to block, even if it's just the first send, we'll have to wrap the call with our own threading logic. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628122#comment-14628122 ] David Hay commented on KAFKA-1835: -- It seems to me that there are two aspects to sending a message: (a) selecting a partition and (b) sending the message to the partition leader. Currently, (b) is performed asynchronously, (a) is not, and contains a potentially blocking call to get partition metadata (or refresh it). Maybe I'm missing something, but why couldn't the producer just queue the ProducerRecord and have a separate thread take care of querying for broker metadata, serializing the key and message, selecting a partition and pushing the result onto the send queue? In other words, put the current implementation of {{send()}} in a separate thread? This is essentially what we've had to do in order to work around the possibility of {{send()}} blocking when we don't want it to. In addition, it puts the serialization process in a separate thread, which is important when we need to send a message and respond to user input as quickly as possible. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2135) New Kafka Producer Client: Send requests wait indefinitely if no broker is available.
[ https://issues.apache.org/jira/browse/KAFKA-2135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527467#comment-14527467 ] David Hay commented on KAFKA-2135: -- [~ewencp] This does appear to be a duplicate of KAFKA-1788 New Kafka Producer Client: Send requests wait indefinitely if no broker is available. - Key: KAFKA-2135 URL: https://issues.apache.org/jira/browse/KAFKA-2135 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: David Hay Assignee: Jun Rao Priority: Critical I'm seeing issues when sending a message with the new producer client API. The future returned from Producer.send() will block indefinitely if the cluster is unreachable for some reason. Here are the steps: # Start up a single node kafka cluster locally. # Start up application and create a KafkaProducer with the following config: {noformat} KafkaProducerWrapper values: compression.type = snappy metric.reporters = [] metadata.max.age.ms = 30 metadata.fetch.timeout.ms = 6 acks = all batch.size = 16384 reconnect.backoff.ms = 10 bootstrap.servers = [localhost:9092] receive.buffer.bytes = 32768 retry.backoff.ms = 100 buffer.memory = 33554432 timeout.ms = 3 key.serializer = class com.mycompany.kafka.serializer.ToStringEncoder retries = 3 max.request.size = 1048576 block.on.buffer.full = true value.serializer = class com.mycompany.kafka.serializer.JsonEncoder metrics.sample.window.ms = 3 send.buffer.bytes = 131072 max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 linger.ms = 0 client.id = site-json {noformat} # Send some messages...they are successfully sent # Shut down the kafka broker # Send another message. At this point, calling {{get()}} on the returned Future will block indefinitely until the broker is restarted. It appears that there is some logic in {{org.apache.kafka.clients.producer.internal.Sender}} that is supposed to mark the Future as done in response to a disconnect event (towards the end of the run(long) method). However, the while loop earlier in this method seems to remove the broker from consideration entirely, so the final loop over ClientResponse objects is never executed. It seems like timeout.ms configuration should be honored in this case, or perhaps introduce another timeout, indicating that we should give up waiting for the cluster to return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14519506#comment-14519506 ] David Hay commented on KAFKA-1835: -- Fixed some syntax errors in my code sample. This example isn't intended to prevent the timeout problems getting the broker metadata. Instead, it is intended to isolate the producer from the lag required to actually send a message to the Kafka brokers. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14518184#comment-14518184 ] David Hay commented on KAFKA-1835: -- [~smiklosovic] I've worked around the issue by creating my own thread pool and wrapping the producer send requests with my own Callable, as follows. This also uses Guava's ListenableFuture class: {code:language=java} ExecutorService executorService = Executors.newFixedThreadPool(5); FutureProducerRecord = Futures.dereference( executorService.submit(new CallableListenableFutureRecordMetadata() { public ListenableFutureRecordMetadata call() throws Exception { final SettableFutureRecordMetadata responseFuture = SettableFuture.create(); try { producer.send(new ProducerRecord(topic, key, message), new Callback() { public void onCompletion(RecordMetadata metadata, Exception ex) { if (exception == null) { responseFuture.set(metadata) } else { resposneFuture.setException(ex) } } } } catch (Exception ex) { responseFuture.setException(ex); } return responseFuture; } }); {code} Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2137) New Kafka Producer not fully asynchronous
David Hay created KAFKA-2137: Summary: New Kafka Producer not fully asynchronous Key: KAFKA-2137 URL: https://issues.apache.org/jira/browse/KAFKA-2137 Project: Kafka Issue Type: Improvement Reporter: David Hay The new Producer client attempts to be fully asynchronous. However, it sill has the potential to block at the start of the {{send}} method when it asks for the metadata for the topic. ({{waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs)}}) There is a timeout (60 seconds, by default), but it would be nice if this lookup was performed in the background thread as well. This way producers could fire and forget without any potential to block the sending thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505238#comment-14505238 ] David Hay commented on KAFKA-1835: -- This solution doesn't seem ideal to me. It requires an update to {{pre.initialize.topics}} every time we add a new topic to our system. Otherwise, if I publish to a topic that is not in the list, then the behavior is the same as now...blocking until the metadata is returned the first time. Ideally, as I mentioned in KAFKA-2137, the metadata refresh would happen in a background thread. Perhaps a better solution would be have the entire body of the {{send(ProducerRecord, Callback)}} method running in a separate thread (or thread pool)? Alternately, is there a way to submit the request to the Sender without (yet) knowing what partition we want to send to? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2135) New Kafka Producer Client: Send requests wait indefinitely if no broker is available.
David Hay created KAFKA-2135: Summary: New Kafka Producer Client: Send requests wait indefinitely if no broker is available. Key: KAFKA-2135 URL: https://issues.apache.org/jira/browse/KAFKA-2135 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Reporter: David Hay Assignee: Jun Rao Priority: Critical I'm seeing issues when sending a message with the new producer client API. The future returned from Producer.send() will block indefinitely if the cluster is unreachable for some reason. Here are the steps: # Start up a single node kafka cluster locally. # Start up application and create a KafkaProducer with the following config: {noformat} KafkaProducerWrapper values: compression.type = snappy metric.reporters = [] metadata.max.age.ms = 30 metadata.fetch.timeout.ms = 6 acks = all batch.size = 16384 reconnect.backoff.ms = 10 bootstrap.servers = [localhost:9092] receive.buffer.bytes = 32768 retry.backoff.ms = 100 buffer.memory = 33554432 timeout.ms = 3 key.serializer = class com.mycompany.kafka.serializer.ToStringEncoder retries = 3 max.request.size = 1048576 block.on.buffer.full = true value.serializer = class com.mycompany.kafka.serializer.JsonEncoder metrics.sample.window.ms = 3 send.buffer.bytes = 131072 max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 linger.ms = 0 client.id = site-json {noformat} # Send some messages...they are successfully sent # Shut down the kafka broker # Send another message. At this point, calling {{get()}} on the returned Future will block indefinitely until the broker is restarted. It appears that there is some logic in {{org.apache.kafka.clients.producer.internal.Sender}} that is supposed to mark the Future as done in response to a disconnect event (towards the end of the run(long) method). However, the while loop earlier in this method seems to remove the broker from consideration entirely, so the final loop over ClientResponse objects is never executed. It seems like timeout.ms configuration should be honored in this case, or perhaps introduce another timeout, indicating that we should give up waiting for the cluster to return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)