lianetm commented on code in PR #22035:
URL: https://github.com/apache/kafka/pull/22035#discussion_r3381359706
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -268,6 +268,10 @@ private void processApplicationEvents() {
applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
Review Comment:
should we change this to error level?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -155,4 +158,27 @@ public void close(final Duration timeout) {
() -> log.warn("The application event handler was already
closed")
);
}
+
+ /**
+ * Best-effort check that the consumer network thread is still alive. If
the thread has
+ * already terminated (due to a failure or shutdown), it will never
process any events from
+ * the queue. Rather than blocking indefinitely or timing out with a
misleading error, this
+ * fails fast with a clear error message.
+ *
+ * <p>Note: this is inherently racy — the thread could die between this
check and the
+ * subsequent {@code applicationEventQueue.add()}. That narrow window is
acceptable because
+ * any subsequent call to {@code add()} will detect the dead thread
immediately, and any
+ * orphaned events will be expired by the {@link CompletableEventReaper}
during consumer
+ * {@link #close() close}.
+ *
+ * @throws KafkaException if the background thread is not alive
+ */
+ private void ensureNetworkThreadAlive() {
+ if (networkThread == null || !networkThread.isAlive()) {
+ throw new KafkaException(
+ "The async consumer background thread is not running and
cannot process requests. " +
+ "This may be due to a previous fatal error in the background
thread. " +
+ "Check earlier logs for details.");
Review Comment:
Can we be more explicit here about what failed?
We could keep the "fatalError" in the `ConsumerNetworkThread`, so we can get
it here (same we do for metadata for instance, and even in the same
ConsumerNetworkTrhread for the initialization error)
https://github.com/apache/kafka/blob/8cfc1cc97f522a9d8ece5cba47edf1a00757cd8c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L83
Then here we could tell the user what was the fatal error that made the
background thread die. wdyt?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -268,6 +268,10 @@ private void processApplicationEvents() {
applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
+ if (event instanceof CompletableEvent) {
+ ((CompletableEvent<?>)
event).future().completeExceptionally(
+ ConsumerUtils.maybeWrapAsKafkaException(t));
Review Comment:
I'm not sure this is really needed, on the app thread we already wrap the
future result , this
https://github.com/apache/kafka/blob/6208dfc014ba7ae48804c5ae92e3bf4f1306b057/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java#L226
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -155,4 +158,27 @@ public void close(final Duration timeout) {
() -> log.warn("The application event handler was already
closed")
);
}
+
+ /**
+ * Best-effort check that the consumer network thread is still alive. If
the thread has
+ * already terminated (due to a failure or shutdown), it will never
process any events from
+ * the queue. Rather than blocking indefinitely or timing out with a
misleading error, this
+ * fails fast with a clear error message.
+ *
+ * <p>Note: this is inherently racy — the thread could die between this
check and the
+ * subsequent {@code applicationEventQueue.add()}. That narrow window is
acceptable because
+ * any subsequent call to {@code add()} will detect the dead thread
immediately, and any
+ * orphaned events will be expired by the {@link CompletableEventReaper}
during consumer
+ * {@link #close() close}.
+ *
+ * @throws KafkaException if the background thread is not alive
+ */
+ private void ensureNetworkThreadAlive() {
+ if (networkThread == null || !networkThread.isAlive()) {
+ throw new KafkaException(
+ "The async consumer background thread is not running and
cannot process requests. " +
Review Comment:
Should we keep it generic and refer to "the consumer" instead of "async
consumer". This will propagate to the user on API calls, and in the end the
async/classic is an internal representation really. Also, this is used by the
share consumer too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]