[ 
https://issues.apache.org/jira/browse/KAFKA-17263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870998#comment-17870998
 ] 

Kuradeon commented on KAFKA-17263:
----------------------------------

I assume these two flags in HeartbeatThread should be volatile:
{code:java}
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
        private boolean enabled = false;
        private boolean closed = false;
 //...
}
{code}
Or there is a little chance that the heartbeat thread wasn't able to read 
'closed=true' after it was notified by close() method, and continued to the 
next codes.

> Sometimes KafkaConsumer.close will block the thread
> ---------------------------------------------------
>
>                 Key: KAFKA-17263
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17263
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.8.2
>         Environment: Java 8
>            Reporter: Kuradeon
>            Priority: Major
>
> Sometimes, the thread will be blocked and never continue (state: waiting) 
> after invoke KafkaConsumer.close(). Here are the related thread dump:
> {code:java}
> Thread 73 (pool-5-thread-1):
>   State: WAITING
>   Blocked count: 25
>   Waited count: 22
>   Waiting on 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread@565a2927
>   Stack:
>     java.lang.Object.wait(Native Method)
>     java.lang.Thread.join(Thread.java:1257)
>     java.lang.Thread.join(Thread.java:1331)
>     
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.closeHeartbeatThread(AbstractCoordinator.java:385)
>     
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1010)
>     
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:926)
>     
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2366)
>     
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2333)
>     
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2283)
>     
> com.ebay.msg.numessage.kafka.client.NuMsgConsumer.close(NuMsgConsumer.java:103)
>     sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     java.lang.reflect.Method.invoke(Method.java:498)
>     
> com.ebay.msg.numessage.kafka.client.NuMsgConsumerDelegate$ThreadSafeDelegate.invoke(NuMsgConsumerDelegate.java:146)
>     com.sun.proxy.$Proxy192.close(Unknown Source)
>     sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     java.lang.reflect.Method.invoke(Method.java:498)
> Thread 687 (kafka-coordinator-heartbeat-thread | 
> numsg-shared-stream-rno.numsg-st-c35):
>   State: WAITING
>   Blocked count: 0
>   Waited count: 1
>   Waiting on 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator@51f1e632
>   Stack:
>     java.lang.Object.wait(Native Method)
>     java.lang.Object.wait(Object.java:502)
>     
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1360)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to