[jira] [Created] (KAFKA-8949) MockConsumer.assign and ConsumerRebalanceListener

2019-09-26 Thread David Hay (Jira)
David Hay created KAFKA-8949:


 Summary: MockConsumer.assign and ConsumerRebalanceListener
 Key: KAFKA-8949
 URL: https://issues.apache.org/jira/browse/KAFKA-8949
 Project: Kafka
  Issue Type: Bug
Reporter: David Hay


Using MockConsumer, one must either use {{assign}} or {{subscribe}}, not both.  
However, if you decide to use the {{assign}} method, there is not an easy way 
to register a ConsumerPartitionListener that will get called when {{assign}} is 
called.  The current work around is to do the following:
{code:java}
MockConsumer consumer = new MockConsumer();
consumer.subscribe(Collections.singletonList("test.topic"), listener);
consumer.unsubscribe();
consumer.assign(Arrays.asList(new TopicPartition("test.topic", 1), ...)){code}
Would be nice to do something like:
{code:java}
MockConsumer consumer = new MockConsumer();
consumer.registerRebalanceListener();
consumer.assign(...){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-6388) Error while trying to roll a segment that already exists

2017-12-19 Thread David Hay (JIRA)
David Hay created KAFKA-6388:


 Summary: Error while trying to roll a segment that already exists
 Key: KAFKA-6388
 URL: https://issues.apache.org/jira/browse/KAFKA-6388
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: David Hay
Assignee: Neha Narkhede
Priority: Blocker


I tried setting up a 5 broker 0.8 cluster and sending messages to 100s of 
topics on it. For a couple of topic partitions, the produce requests never 
succeed since they fail on the leader with the following error - 

[2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
file 000
0.log already exists; deleting it first (kafka.log.Log)
[2012-12-05 22:54:05,711] WARN [Kafka Log on Broker 2], Newly rolled segment 
file 000
0.index already exists; deleting it first (kafka.log.Log)
[2012-12-05 22:54:05,715] ERROR [ReplicaFetcherThread-1-0-on-broker-2], Error 
due to  (kafka.server.R
eplicaFetcherThread)
kafka.common.KafkaException: Trying to roll a new log segment for topic 
partition NusWriteEvent-4 with start offset 0 while it already exsits
at kafka.log.Log.rollToOffset(Log.scala:456)
at kafka.log.Log.roll(Log.scala:434)
at kafka.log.Log.maybeRoll(Log.scala:423)
at kafka.log.Log.append(Log.scala:257)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:51)
at 
kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:125)
at 
kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:108)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:108)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-07-16 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629874#comment-14629874
 ] 

David Hay commented on KAFKA-1835:
--

If I understand correctly, the proposal in KIP-19 is to add some additional 
timeout parameters around sending messages (e.g. the amount of time a message 
can sit in the accumulator waiting for a broker to send to) as well as the max 
time to block waiting for things like broker metadata, etc.

While this is closer, I would still feel obligated to wrap the send() request 
with my own threading, as I want to give the send request the best opportunity 
to succeed but not delay the users's request.  That is, I wouldn't want to set 
the max.block.ms to 0, as I'd want to give the producer the opportunity to get 
the broker metadata, if needed, and not fail right away.  I just want this 
request to run in a separate thread and then propagate the TimeoutException to 
the Callback provided to the send() method.

 Kafka new producer needs options to make blocking behavior explicit
 ---

 Key: KAFKA-1835
 URL: https://issues.apache.org/jira/browse/KAFKA-1835
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
Reporter: Paul Pearcy
 Fix For: 0.8.3

 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
 KAFKA-1835.patch

   Original Estimate: 504h
  Remaining Estimate: 504h

 The new (0.8.2 standalone) producer will block the first time it attempts to 
 retrieve metadata for a topic. This is not the desired behavior in some use 
 cases where async non-blocking guarantees are required and message loss is 
 acceptable in known cases. Also, most developers will assume an API that 
 returns a future is safe to call in a critical request path. 
 Discussing on the mailing list, the most viable option is to have the 
 following settings:
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x
  
 This moves potential blocking to the init of the producer and outside of some 
 random request. The potential will still exist for blocking in a corner case 
 where connectivity with Kafka is lost and a topic not included in pre-init 
 has a message sent for the first time. 
 There is the question of what to do when initialization fails. There are a 
 couple of options that I'd like available:
 - Fail creation of the client 
 - Fail all sends until the meta is available 
 Open to input on how the above option should be expressed. 
 It is also worth noting more nuanced solutions exist that could work without 
 the extra settings, they just end up having extra complications and at the 
 end of the day not adding much value. For instance, the producer could accept 
 and queue messages(note: more complicated than I am making it sound due to 
 storing all accepted messages in pre-partitioned compact binary form), but 
 you're still going to be forced to choose to either start blocking or 
 dropping messages at some point. 
 I have some test cases I am going to port over to the Kafka producer 
 integration ones and start from there. My current impl is in scala, but 
 porting to Java shouldn't be a big deal (was using a promise to track init 
 status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-07-15 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628656#comment-14628656
 ] 

David Hay commented on KAFKA-1835:
--

With this example, we would still be required to wrap send() with our own 
threading code to make it completely asynchronous.  Since send() returns a 
Future, there is an implicit assumption that it will never block for anything.  
In our case, we want the call to send() to be completely asynchronous so that 
we don't impact end user performance.  We deal with the exception in a reactive 
manner, using the callback variant of send().

As long as send() has the potential to block, even if it's just the first send, 
we'll have to wrap the call with our own threading logic.

 Kafka new producer needs options to make blocking behavior explicit
 ---

 Key: KAFKA-1835
 URL: https://issues.apache.org/jira/browse/KAFKA-1835
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
Reporter: Paul Pearcy
 Fix For: 0.8.3

 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
 KAFKA-1835.patch

   Original Estimate: 504h
  Remaining Estimate: 504h

 The new (0.8.2 standalone) producer will block the first time it attempts to 
 retrieve metadata for a topic. This is not the desired behavior in some use 
 cases where async non-blocking guarantees are required and message loss is 
 acceptable in known cases. Also, most developers will assume an API that 
 returns a future is safe to call in a critical request path. 
 Discussing on the mailing list, the most viable option is to have the 
 following settings:
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x
  
 This moves potential blocking to the init of the producer and outside of some 
 random request. The potential will still exist for blocking in a corner case 
 where connectivity with Kafka is lost and a topic not included in pre-init 
 has a message sent for the first time. 
 There is the question of what to do when initialization fails. There are a 
 couple of options that I'd like available:
 - Fail creation of the client 
 - Fail all sends until the meta is available 
 Open to input on how the above option should be expressed. 
 It is also worth noting more nuanced solutions exist that could work without 
 the extra settings, they just end up having extra complications and at the 
 end of the day not adding much value. For instance, the producer could accept 
 and queue messages(note: more complicated than I am making it sound due to 
 storing all accepted messages in pre-partitioned compact binary form), but 
 you're still going to be forced to choose to either start blocking or 
 dropping messages at some point. 
 I have some test cases I am going to port over to the Kafka producer 
 integration ones and start from there. My current impl is in scala, but 
 porting to Java shouldn't be a big deal (was using a promise to track init 
 status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-07-15 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628122#comment-14628122
 ] 

David Hay commented on KAFKA-1835:
--

It seems to me that there are two aspects to sending a message: (a) selecting a 
partition and (b) sending the message to the partition leader.  Currently, (b) 
is performed asynchronously, (a) is not, and contains a potentially blocking 
call to get partition metadata (or refresh it).

Maybe I'm missing something, but why couldn't the producer just queue the 
ProducerRecord and have a separate thread take care of querying for broker 
metadata, serializing the key and message, selecting a partition and pushing 
the result onto the send queue?  In other words, put the current implementation 
of {{send()}} in a separate thread?  This is essentially what we've had to do 
in order to work around the possibility of {{send()}} blocking when we don't 
want it to.  In addition, it puts the serialization process in a separate 
thread, which is important when we need to send a message and respond to user 
input as quickly as possible.

 Kafka new producer needs options to make blocking behavior explicit
 ---

 Key: KAFKA-1835
 URL: https://issues.apache.org/jira/browse/KAFKA-1835
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
Reporter: Paul Pearcy
 Fix For: 0.8.3

 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
 KAFKA-1835.patch

   Original Estimate: 504h
  Remaining Estimate: 504h

 The new (0.8.2 standalone) producer will block the first time it attempts to 
 retrieve metadata for a topic. This is not the desired behavior in some use 
 cases where async non-blocking guarantees are required and message loss is 
 acceptable in known cases. Also, most developers will assume an API that 
 returns a future is safe to call in a critical request path. 
 Discussing on the mailing list, the most viable option is to have the 
 following settings:
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x
  
 This moves potential blocking to the init of the producer and outside of some 
 random request. The potential will still exist for blocking in a corner case 
 where connectivity with Kafka is lost and a topic not included in pre-init 
 has a message sent for the first time. 
 There is the question of what to do when initialization fails. There are a 
 couple of options that I'd like available:
 - Fail creation of the client 
 - Fail all sends until the meta is available 
 Open to input on how the above option should be expressed. 
 It is also worth noting more nuanced solutions exist that could work without 
 the extra settings, they just end up having extra complications and at the 
 end of the day not adding much value. For instance, the producer could accept 
 and queue messages(note: more complicated than I am making it sound due to 
 storing all accepted messages in pre-partitioned compact binary form), but 
 you're still going to be forced to choose to either start blocking or 
 dropping messages at some point. 
 I have some test cases I am going to port over to the Kafka producer 
 integration ones and start from there. My current impl is in scala, but 
 porting to Java shouldn't be a big deal (was using a promise to track init 
 status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2135) New Kafka Producer Client: Send requests wait indefinitely if no broker is available.

2015-05-04 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527467#comment-14527467
 ] 

David Hay commented on KAFKA-2135:
--

[~ewencp] This does appear to be a duplicate of KAFKA-1788

 New Kafka Producer Client: Send requests wait indefinitely if no broker is 
 available.
 -

 Key: KAFKA-2135
 URL: https://issues.apache.org/jira/browse/KAFKA-2135
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: David Hay
Assignee: Jun Rao
Priority: Critical

 I'm seeing issues when sending a message with the new producer client API.  
 The future returned from Producer.send() will block indefinitely if the 
 cluster is unreachable for some reason.  Here are the steps:
 # Start up a single node kafka cluster locally.
 # Start up application and create a KafkaProducer with the following config:
 {noformat}
 KafkaProducerWrapper values: 
   compression.type = snappy
   metric.reporters = []
   metadata.max.age.ms = 30
   metadata.fetch.timeout.ms = 6
   acks = all
   batch.size = 16384
   reconnect.backoff.ms = 10
   bootstrap.servers = [localhost:9092]
   receive.buffer.bytes = 32768
   retry.backoff.ms = 100
   buffer.memory = 33554432
   timeout.ms = 3
   key.serializer = class com.mycompany.kafka.serializer.ToStringEncoder
   retries = 3
   max.request.size = 1048576
   block.on.buffer.full = true
   value.serializer = class com.mycompany.kafka.serializer.JsonEncoder
   metrics.sample.window.ms = 3
   send.buffer.bytes = 131072
   max.in.flight.requests.per.connection = 5
   metrics.num.samples = 2
   linger.ms = 0
   client.id = site-json
 {noformat}
 # Send some messages...they are successfully sent
 # Shut down the kafka broker
 # Send another message.
 At this point, calling {{get()}} on the returned Future will block 
 indefinitely until the broker is restarted.
 It appears that there is some logic in 
 {{org.apache.kafka.clients.producer.internal.Sender}} that is supposed to 
 mark the Future as done in response to a disconnect event (towards the end 
 of the run(long) method).  However, the while loop earlier in this method 
 seems to remove the broker from consideration entirely, so the final loop 
 over ClientResponse objects is never executed.
 It seems like timeout.ms configuration should be honored in this case, or 
 perhaps introduce another timeout, indicating that we should give up waiting 
 for the cluster to return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-04-29 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14519506#comment-14519506
 ] 

David Hay commented on KAFKA-1835:
--

Fixed some syntax errors in my code sample.  This example isn't intended to 
prevent the timeout problems getting the broker metadata.  Instead, it is 
intended to isolate the producer from the lag required to actually send a 
message to the Kafka brokers.

 Kafka new producer needs options to make blocking behavior explicit
 ---

 Key: KAFKA-1835
 URL: https://issues.apache.org/jira/browse/KAFKA-1835
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
Reporter: Paul Pearcy
 Fix For: 0.8.3

 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
 KAFKA-1835.patch

   Original Estimate: 504h
  Remaining Estimate: 504h

 The new (0.8.2 standalone) producer will block the first time it attempts to 
 retrieve metadata for a topic. This is not the desired behavior in some use 
 cases where async non-blocking guarantees are required and message loss is 
 acceptable in known cases. Also, most developers will assume an API that 
 returns a future is safe to call in a critical request path. 
 Discussing on the mailing list, the most viable option is to have the 
 following settings:
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x
  
 This moves potential blocking to the init of the producer and outside of some 
 random request. The potential will still exist for blocking in a corner case 
 where connectivity with Kafka is lost and a topic not included in pre-init 
 has a message sent for the first time. 
 There is the question of what to do when initialization fails. There are a 
 couple of options that I'd like available:
 - Fail creation of the client 
 - Fail all sends until the meta is available 
 Open to input on how the above option should be expressed. 
 It is also worth noting more nuanced solutions exist that could work without 
 the extra settings, they just end up having extra complications and at the 
 end of the day not adding much value. For instance, the producer could accept 
 and queue messages(note: more complicated than I am making it sound due to 
 storing all accepted messages in pre-partitioned compact binary form), but 
 you're still going to be forced to choose to either start blocking or 
 dropping messages at some point. 
 I have some test cases I am going to port over to the Kafka producer 
 integration ones and start from there. My current impl is in scala, but 
 porting to Java shouldn't be a big deal (was using a promise to track init 
 status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-04-28 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14518184#comment-14518184
 ] 

David Hay commented on KAFKA-1835:
--

[~smiklosovic] I've worked around the issue by creating my own thread pool and 
wrapping the producer send requests with my own Callable, as follows.  This 
also uses Guava's ListenableFuture class:
{code:language=java}
ExecutorService executorService = Executors.newFixedThreadPool(5);

FutureProducerRecord = Futures.dereference(
   executorService.submit(new CallableListenableFutureRecordMetadata() {
  public ListenableFutureRecordMetadata call() throws Exception {
 final SettableFutureRecordMetadata responseFuture = 
SettableFuture.create();
 try {
producer.send(new ProducerRecord(topic, key, message), new 
Callback() {
   public void onCompletion(RecordMetadata metadata, Exception ex) {
  if (exception == null) { responseFuture.set(metadata) }
  else { resposneFuture.setException(ex) }
   }
}
 }
 catch (Exception ex) { responseFuture.setException(ex); }
 return responseFuture; 
  }
   });
{code}


 Kafka new producer needs options to make blocking behavior explicit
 ---

 Key: KAFKA-1835
 URL: https://issues.apache.org/jira/browse/KAFKA-1835
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
Reporter: Paul Pearcy
 Fix For: 0.8.3

 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
 KAFKA-1835.patch

   Original Estimate: 504h
  Remaining Estimate: 504h

 The new (0.8.2 standalone) producer will block the first time it attempts to 
 retrieve metadata for a topic. This is not the desired behavior in some use 
 cases where async non-blocking guarantees are required and message loss is 
 acceptable in known cases. Also, most developers will assume an API that 
 returns a future is safe to call in a critical request path. 
 Discussing on the mailing list, the most viable option is to have the 
 following settings:
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x
  
 This moves potential blocking to the init of the producer and outside of some 
 random request. The potential will still exist for blocking in a corner case 
 where connectivity with Kafka is lost and a topic not included in pre-init 
 has a message sent for the first time. 
 There is the question of what to do when initialization fails. There are a 
 couple of options that I'd like available:
 - Fail creation of the client 
 - Fail all sends until the meta is available 
 Open to input on how the above option should be expressed. 
 It is also worth noting more nuanced solutions exist that could work without 
 the extra settings, they just end up having extra complications and at the 
 end of the day not adding much value. For instance, the producer could accept 
 and queue messages(note: more complicated than I am making it sound due to 
 storing all accepted messages in pre-partitioned compact binary form), but 
 you're still going to be forced to choose to either start blocking or 
 dropping messages at some point. 
 I have some test cases I am going to port over to the Kafka producer 
 integration ones and start from there. My current impl is in scala, but 
 porting to Java shouldn't be a big deal (was using a promise to track init 
 status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2137) New Kafka Producer not fully asynchronous

2015-04-21 Thread David Hay (JIRA)
David Hay created KAFKA-2137:


 Summary: New Kafka Producer not fully asynchronous
 Key: KAFKA-2137
 URL: https://issues.apache.org/jira/browse/KAFKA-2137
 Project: Kafka
  Issue Type: Improvement
Reporter: David Hay


The new Producer client attempts to be fully asynchronous.  However, it sill 
has the potential to block at the start of the {{send}} method when it asks for 
the metadata for the topic.  ({{waitOnMetadata(record.topic(), 
this.metadataFetchTimeoutMs)}})

There is a timeout (60 seconds, by default), but it would be nice if this 
lookup was performed in the background thread as well.  This way producers 
could fire and forget without any potential to block the sending thread.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit

2015-04-21 Thread David Hay (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505238#comment-14505238
 ] 

David Hay commented on KAFKA-1835:
--

This solution doesn't seem ideal to me.  It requires an update to 
{{pre.initialize.topics}} every time we add a new topic to our system.  
Otherwise, if I publish to a topic that is not in the list, then the behavior 
is the same as now...blocking until the metadata is returned the first time.

Ideally, as I mentioned in KAFKA-2137, the metadata refresh would happen in a 
background thread.

Perhaps a better solution would be have the entire body of the 
{{send(ProducerRecord, Callback)}} method running in a separate thread (or 
thread pool)?  Alternately, is there a way to submit the request to the Sender 
without (yet) knowing what partition we want to send to?  

 Kafka new producer needs options to make blocking behavior explicit
 ---

 Key: KAFKA-1835
 URL: https://issues.apache.org/jira/browse/KAFKA-1835
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
Reporter: Paul Pearcy
 Fix For: 0.8.3

 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
 KAFKA-1835.patch

   Original Estimate: 504h
  Remaining Estimate: 504h

 The new (0.8.2 standalone) producer will block the first time it attempts to 
 retrieve metadata for a topic. This is not the desired behavior in some use 
 cases where async non-blocking guarantees are required and message loss is 
 acceptable in known cases. Also, most developers will assume an API that 
 returns a future is safe to call in a critical request path. 
 Discussing on the mailing list, the most viable option is to have the 
 following settings:
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x
  
 This moves potential blocking to the init of the producer and outside of some 
 random request. The potential will still exist for blocking in a corner case 
 where connectivity with Kafka is lost and a topic not included in pre-init 
 has a message sent for the first time. 
 There is the question of what to do when initialization fails. There are a 
 couple of options that I'd like available:
 - Fail creation of the client 
 - Fail all sends until the meta is available 
 Open to input on how the above option should be expressed. 
 It is also worth noting more nuanced solutions exist that could work without 
 the extra settings, they just end up having extra complications and at the 
 end of the day not adding much value. For instance, the producer could accept 
 and queue messages(note: more complicated than I am making it sound due to 
 storing all accepted messages in pre-partitioned compact binary form), but 
 you're still going to be forced to choose to either start blocking or 
 dropping messages at some point. 
 I have some test cases I am going to port over to the Kafka producer 
 integration ones and start from there. My current impl is in scala, but 
 porting to Java shouldn't be a big deal (was using a promise to track init 
 status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2135) New Kafka Producer Client: Send requests wait indefinitely if no broker is available.

2015-04-20 Thread David Hay (JIRA)
David Hay created KAFKA-2135:


 Summary: New Kafka Producer Client: Send requests wait 
indefinitely if no broker is available.
 Key: KAFKA-2135
 URL: https://issues.apache.org/jira/browse/KAFKA-2135
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: David Hay
Assignee: Jun Rao
Priority: Critical


I'm seeing issues when sending a message with the new producer client API.  The 
future returned from Producer.send() will block indefinitely if the cluster is 
unreachable for some reason.  Here are the steps:

# Start up a single node kafka cluster locally.
# Start up application and create a KafkaProducer with the following config:
{noformat}
KafkaProducerWrapper values: 
compression.type = snappy
metric.reporters = []
metadata.max.age.ms = 30
metadata.fetch.timeout.ms = 6
acks = all
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [localhost:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class com.mycompany.kafka.serializer.ToStringEncoder
retries = 3
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class com.mycompany.kafka.serializer.JsonEncoder
metrics.sample.window.ms = 3
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 0
client.id = site-json
{noformat}
# Send some messages...they are successfully sent
# Shut down the kafka broker
# Send another message.

At this point, calling {{get()}} on the returned Future will block indefinitely 
until the broker is restarted.

It appears that there is some logic in 
{{org.apache.kafka.clients.producer.internal.Sender}} that is supposed to mark 
the Future as done in response to a disconnect event (towards the end of the 
run(long) method).  However, the while loop earlier in this method seems to 
remove the broker from consideration entirely, so the final loop over 
ClientResponse objects is never executed.

It seems like timeout.ms configuration should be honored in this case, or 
perhaps introduce another timeout, indicating that we should give up waiting 
for the cluster to return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)