[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17128438#comment-17128438
 ] 

Itamar Benjamin commented on KAFKA-10114:
-----------------------------------------

Hi,

Our app calls flush() before closing the producer. When trying to close the 
faulty producer and create a new one, the flush() call got stuck indefinitely. 
This probably happens since flush assumes messages will be handled at some 
point but in fact this does not happen since the network thread is waiting for 
a producer id and does not fail batches.

> Kafka producer stuck after broker crash
> ---------------------------------------
>
>                 Key: KAFKA-10114
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10114
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Itamar Benjamin
>            Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 60000 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 
> sleeping [0x00007ff55c177000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
>         at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
>         at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>         at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
>         - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to