lucasbru commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394385232
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -415,6 +439,27 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { return ConsumerRecords.empty(); } finally { kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); + wakeupTrigger.clearActiveTask(); + } + } + + private CompletableFuture<Void> setupWakeupTrigger() { + final CompletableFuture<Void> wakeupFuture = new CompletableFuture<>(); + return wakeupTrigger.setActiveTask(wakeupFuture); Review Comment: I had to scratch my head a little bit over this. Since we already have the `Wakeupable` interface, I wonder if it wouldn't be cleaner to have a new static subclass: ``` /** Placeholder for a task that delays wake-up until it's manually triggered */ DelayedWakeupTask(final boolean wakeUpMarker) ``` Then replace `DelayedWakeupTask(false)` by `DelayedWakeupTask(true)` in `WakeupTrigger.wakeup` and move `maybeTriggerWakeup` to `WakeupTrigger`, rename it to `maybeTriggerDelayedWakeup`. Then we'd have fewer code paths to worry about (no non-exceptional completion, no other possible exceptions where we ourselves don't know what to do and just throw and `IllegalStateException`). Its just an idea, could be that I overlooked something. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -168,6 +178,44 @@ public void testCommitted_ExceptionThrown() { } } + @Test + public void testWakeupBeforeCallingPoll() { + final TopicPartition tp = new TopicPartition("foo", 3); + consumer.assign(singleton(tp)); + + consumer.wakeup(); + + assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); + } + + @Test + public void testWakeupAfterEmptyFetch() { Review Comment: Do we not want to test `wake-up` with a non-empty fetch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org