lucasbru commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814363083
> @philipnee @kirktrue @lucasbru I am concerned that when one calls `wakeup()` the application thread might stay blocked in waiting for a non-empty fetch buffer. Yes. The application thread remains blocked, so this doesn't really wake the application thread up. > Currently, I do not know how to interrupt the application thread with the current code. I cannot do it in `AsyncKafkaConsumer#wakeup()` because I do not have a handle on the application thread and I cannot do it in `WakeupTrigger#maybeTriggerWakeup()` because there it is too late. Any ideas? I'd say we signal `notEmptyCondition` in `FetchBuffer`. That could possibly be achieved this way: - We implement a new subclass of `Wakeupable` that is a `FetchAction(FetchBuffer)`, similar to my idea above. - In `AsynConsumer.poll`, `FetchAction` is registered before the poll and removed after the poll. - In `WakeupTrigger.wakeup()`, `FetchAction` is replaced by a `WakeupFuture`, and `FetchBuffer.wakeup` is called. - `FetchBuffer.wakeup` calls `notEmptyCondition.signalAll()`. There are other ideas, but I think they are more problematic: 1. If we wanted to stick more to the original architecture, we could attempt to solve "waiting for a non-empty fetch buffer" by putting a future in the background queue and let the background thread poll loop complete the future. But that would be a significant rewrite and may have other problems. 2. I'm implementing a change where only one thread can use the consumer API, just like in the original consumer. In principle, you could store a reference to the application thread in the consumer when it is opened and interrupt it that way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org