squah-confluent commented on code in PR #22466:
URL: https://github.com/apache/kafka/pull/22466#discussion_r3356966455


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -656,14 +667,21 @@ private void freeCurrentBatch() {
          */
         private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
             enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
-                withActiveContextOrThrow(tp, context -> {
-                    // The batch could have already been flushed because it 
reached the maximum
-                    // batch size or a transactional write came in. When this 
happens, we want
-                    // to avoid flushing the next batch early.
-                    if (context.currentBatch != null && context.batchEpoch == 
expectedBatchEpoch) {
-                        context.flushCurrentBatch();
-                    }
-                });
+                try {
+                    withActiveContextOrThrow(tp, context -> {
+                        // The batch could have already been flushed because 
it reached the maximum
+                        // batch size or a transactional write came in. When 
this happens, we want
+                        // to avoid flushing the next batch early.
+                        if (context.currentBatch != null && context.batchEpoch 
== expectedBatchEpoch) {
+                            context.flushCurrentBatch();
+                        }
+                    });
+                } catch (Throwable t) {
+                    // Any failure has already been handled and logged by 
flushCurrentBatch, and
+                    // there is nothing to flush if the coordinator is no 
longer active. The
+                    // exception is not propagated to avoid a redundant error 
being logged.
+                    log.debug("Failed to flush the pending records of {}: 
{}.", tp, t.getMessage());

Review Comment:
   Do you have a good way to resolve this in mind? I agree it's annoying to log 
twice.
   
   `flushCurrentBatch` must throw to signal failure otherwise the normal append 
path can do unwanted things when overflowing onto the next batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to