dajac commented on code in PR #22466:
URL: https://github.com/apache/kafka/pull/22466#discussion_r3361163535


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -656,14 +667,21 @@ private void freeCurrentBatch() {
          */
         private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
             enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
-                withActiveContextOrThrow(tp, context -> {
-                    // The batch could have already been flushed because it 
reached the maximum
-                    // batch size or a transactional write came in. When this 
happens, we want
-                    // to avoid flushing the next batch early.
-                    if (context.currentBatch != null && context.batchEpoch == 
expectedBatchEpoch) {
-                        context.flushCurrentBatch();
-                    }
-                });
+                try {
+                    withActiveContextOrThrow(tp, context -> {
+                        // The batch could have already been flushed because 
it reached the maximum
+                        // batch size or a transactional write came in. When 
this happens, we want
+                        // to avoid flushing the next batch early.
+                        if (context.currentBatch != null && context.batchEpoch 
== expectedBatchEpoch) {
+                            context.flushCurrentBatch();
+                        }
+                    });
+                } catch (Throwable t) {
+                    // Any failure has already been handled and logged by 
flushCurrentBatch, and
+                    // there is nothing to flush if the coordinator is no 
longer active. The
+                    // exception is not propagated to avoid a redundant error 
being logged.
+                    log.debug("Failed to flush the pending records of {}: 
{}.", tp, t.getMessage());

Review Comment:
   I am not really confortable with having a catch-all without any logging. At 
the moment, it works because `flushCurrentBatch` catches all the exceptions but 
this invariant is not enforced. My concern is that it could change in the 
future and we may miss to update the callers. The double logging is not too bad 
in debug. It may also be helpful to trace the caller of `flushCurrentBatch`.
   
   One thing that we could consider is not using `withActiveContextOrThrow` 
because we don't really need to log anything when the coordinator is not active.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to