lianetm commented on code in PR #22035:
URL: https://github.com/apache/kafka/pull/22035#discussion_r3381359706


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -268,6 +268,10 @@ private void processApplicationEvents() {
                 applicationEventProcessor.process(event);
             } catch (Throwable t) {
                 log.warn("Error processing event {}", t.getMessage(), t);

Review Comment:
   should we change this to error level? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -155,4 +158,27 @@ public void close(final Duration timeout) {
                 () -> log.warn("The application event handler was already 
closed")
         );
     }
+
+    /**
+     * Best-effort check that the consumer network thread is still alive. If 
the thread has
+     * already terminated (due to a failure or shutdown), it will never 
process any events from
+     * the queue. Rather than blocking indefinitely or timing out with a 
misleading error, this
+     * fails fast with a clear error message.
+     *
+     * <p>Note: this is inherently racy — the thread could die between this 
check and the
+     * subsequent {@code applicationEventQueue.add()}. That narrow window is 
acceptable because
+     * any subsequent call to {@code add()} will detect the dead thread 
immediately, and any
+     * orphaned events will be expired by the {@link CompletableEventReaper} 
during consumer
+     * {@link #close() close}.
+     *
+     * @throws KafkaException if the background thread is not alive
+     */
+    private void ensureNetworkThreadAlive() {
+        if (networkThread == null || !networkThread.isAlive()) {
+            throw new KafkaException(
+                "The async consumer background thread is not running and 
cannot process requests. " +
+                "This may be due to a previous fatal error in the background 
thread. " +
+                "Check earlier logs for details.");

Review Comment:
   Can we be more explicit here about what failed? 
   
   We could keep the "fatalError" in the `ConsumerNetworkThread`, so we can get 
it here (same we do for metadata for instance, and even in the same 
ConsumerNetworkTrhread for the initialization error)
   
   
https://github.com/apache/kafka/blob/8cfc1cc97f522a9d8ece5cba47edf1a00757cd8c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L83
 
   
   Then here we could tell the user what was the fatal error that made the 
background thread die. wdyt?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -268,6 +268,10 @@ private void processApplicationEvents() {
                 applicationEventProcessor.process(event);
             } catch (Throwable t) {
                 log.warn("Error processing event {}", t.getMessage(), t);
+                if (event instanceof CompletableEvent) {
+                    ((CompletableEvent<?>) 
event).future().completeExceptionally(
+                        ConsumerUtils.maybeWrapAsKafkaException(t));

Review Comment:
   I'm not sure this is really needed, on the app thread we already wrap the 
future result , this 
https://github.com/apache/kafka/blob/6208dfc014ba7ae48804c5ae92e3bf4f1306b057/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java#L226
 
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -155,4 +158,27 @@ public void close(final Duration timeout) {
                 () -> log.warn("The application event handler was already 
closed")
         );
     }
+
+    /**
+     * Best-effort check that the consumer network thread is still alive. If 
the thread has
+     * already terminated (due to a failure or shutdown), it will never 
process any events from
+     * the queue. Rather than blocking indefinitely or timing out with a 
misleading error, this
+     * fails fast with a clear error message.
+     *
+     * <p>Note: this is inherently racy — the thread could die between this 
check and the
+     * subsequent {@code applicationEventQueue.add()}. That narrow window is 
acceptable because
+     * any subsequent call to {@code add()} will detect the dead thread 
immediately, and any
+     * orphaned events will be expired by the {@link CompletableEventReaper} 
during consumer
+     * {@link #close() close}.
+     *
+     * @throws KafkaException if the background thread is not alive
+     */
+    private void ensureNetworkThreadAlive() {
+        if (networkThread == null || !networkThread.isAlive()) {
+            throw new KafkaException(
+                "The async consumer background thread is not running and 
cannot process requests. " +

Review Comment:
   Should we keep it generic and refer to "the consumer" instead of "async 
consumer". This will propagate to the user on API calls, and in the end the 
async/classic is an internal representation really. Also, this is used by the 
share consumer too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to