lucasbru commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394418301


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -415,6 +439,27 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             return ConsumerRecords.empty();
         } finally {
             kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+            wakeupTrigger.clearActiveTask();

Review Comment:
   Hmm, I'm not 100% sure we are providing the guarantee stated in the javadoc 
of the consumer here. 
   
   This would be: 
   
   ```
        * If no thread is blocking in a method which can throw {@link 
org.apache.kafka.common.errors.WakeupException}, the next call to such a method 
will raise it instead.
   ```
   
   But if the current fetch returns records, we will never throw a 
`WakeupException`, not from this `poll` nor the next. Should we keep the 
"activeTask" around strictly until we are ready to throw a `WakeupException`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to