Qingsheng Ren created KAFKA-14208:
-------------------------------------

             Summary: KafkaConsumer#commitAsync throws unexpected 
WakeupException
                 Key: KAFKA-14208
                 URL: https://issues.apache.org/jira/browse/KAFKA-14208
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.2.1
            Reporter: Qingsheng Ren


We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink 
Kafka connector (FLINK-29153). Here's the exception:
{code:java}
org.apache.kafka.common.errors.WakeupException
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226)
 {code}
As {{WakeupException}} is not listed in the JavaDoc of 
{{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the 
exception thrown directly from KafkaConsumer#commitAsync but handles all 
exceptions in the callback.

I checked the source code and suspect this is caused by KAFKA-13563. Also we 
never had this exception in commitAsync when we used Kafka client 2.4.1 & 
2.8.1. 

I'm wondering if this is kind of breaking the public API as the WakeupException 
is not listed in JavaDoc, and maybe it's better to invoke the callback to 
handle the {{WakeupException}} instead of throwing it directly from the method 
itself. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to