dajac commented on code in PR #22466:
URL: https://github.com/apache/kafka/pull/22466#discussion_r3361163535
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -656,14 +667,21 @@ private void freeCurrentBatch() {
*/
private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
- withActiveContextOrThrow(tp, context -> {
- // The batch could have already been flushed because it
reached the maximum
- // batch size or a transactional write came in. When this
happens, we want
- // to avoid flushing the next batch early.
- if (context.currentBatch != null && context.batchEpoch ==
expectedBatchEpoch) {
- context.flushCurrentBatch();
- }
- });
+ try {
+ withActiveContextOrThrow(tp, context -> {
+ // The batch could have already been flushed because
it reached the maximum
+ // batch size or a transactional write came in. When
this happens, we want
+ // to avoid flushing the next batch early.
+ if (context.currentBatch != null && context.batchEpoch
== expectedBatchEpoch) {
+ context.flushCurrentBatch();
+ }
+ });
+ } catch (Throwable t) {
+ // Any failure has already been handled and logged by
flushCurrentBatch, and
+ // there is nothing to flush if the coordinator is no
longer active. The
+ // exception is not propagated to avoid a redundant error
being logged.
+ log.debug("Failed to flush the pending records of {}:
{}.", tp, t.getMessage());
Review Comment:
I am not really confortable with having a catch-all without any logging. At
the moment, it works because `flushCurrentBatch` catches all the exceptions but
this invariant is not enforced. My concern is that it could change in the
future and we may miss to update the callers. The double logging is not too bad
in debug. It may also be helpful to trace the caller of `flushCurrentBatch`.
One thing that we could consider is not using `withActiveContextOrThrow`
because we don't really need to log anything when the coordinator is not active.
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]