dajac commented on code in PR #22466:
URL: https://github.com/apache/kafka/pull/22466#discussion_r3357084526


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -656,14 +667,21 @@ private void freeCurrentBatch() {
          */
         private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
             enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
-                withActiveContextOrThrow(tp, context -> {
-                    // The batch could have already been flushed because it 
reached the maximum
-                    // batch size or a transactional write came in. When this 
happens, we want
-                    // to avoid flushing the next batch early.
-                    if (context.currentBatch != null && context.batchEpoch == 
expectedBatchEpoch) {
-                        context.flushCurrentBatch();
-                    }
-                });
+                try {
+                    withActiveContextOrThrow(tp, context -> {
+                        // The batch could have already been flushed because 
it reached the maximum
+                        // batch size or a transactional write came in. When 
this happens, we want
+                        // to avoid flushing the next batch early.
+                        if (context.currentBatch != null && context.batchEpoch 
== expectedBatchEpoch) {
+                            context.flushCurrentBatch();
+                        }
+                    });
+                } catch (Throwable t) {
+                    // Any failure has already been handled and logged by 
flushCurrentBatch, and
+                    // there is nothing to flush if the coordinator is no 
longer active. The
+                    // exception is not propagated to avoid a redundant error 
being logged.
+                    log.debug("Failed to flush the pending records of {}: 
{}.", tp, t.getMessage());

Review Comment:
   It is actually to catch both cases and log them in debug only. Otherwise, 
there are caught by the `CoordinatorInternalEvent` and logged are ERROR gain: 
`Execution of {} failed due to {}.`. It seems a bit better to explicitly handle 
them and log them in DEBUG with the appropriate message at least. What do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to