dajac commented on code in PR #22466:
URL: https://github.com/apache/kafka/pull/22466#discussion_r3357084526
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -656,14 +667,21 @@ private void freeCurrentBatch() {
*/
private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
- withActiveContextOrThrow(tp, context -> {
- // The batch could have already been flushed because it
reached the maximum
- // batch size or a transactional write came in. When this
happens, we want
- // to avoid flushing the next batch early.
- if (context.currentBatch != null && context.batchEpoch ==
expectedBatchEpoch) {
- context.flushCurrentBatch();
- }
- });
+ try {
+ withActiveContextOrThrow(tp, context -> {
+ // The batch could have already been flushed because
it reached the maximum
+ // batch size or a transactional write came in. When
this happens, we want
+ // to avoid flushing the next batch early.
+ if (context.currentBatch != null && context.batchEpoch
== expectedBatchEpoch) {
+ context.flushCurrentBatch();
+ }
+ });
+ } catch (Throwable t) {
+ // Any failure has already been handled and logged by
flushCurrentBatch, and
+ // there is nothing to flush if the coordinator is no
longer active. The
+ // exception is not propagated to avoid a redundant error
being logged.
+ log.debug("Failed to flush the pending records of {}:
{}.", tp, t.getMessage());
Review Comment:
It is actually to catch both cases and log them in debug only. Otherwise,
there are caught by the `CoordinatorInternalEvent` and logged are ERROR gain:
`Execution of {} failed due to {}.`. It seems a bit better to explicitly handle
them and log them in DEBUG with the appropriate message at least. What do you
think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]