This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 683d0bbc4c MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during rebalancing. (#12299) 683d0bbc4c is described below commit 683d0bbc4ca7223e0ade55be58497af8aded6823 Author: James Hughes <jnh...@gmail.com> AuthorDate: Thu Jun 16 12:40:08 2022 -0400 MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during rebalancing. (#12299) Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../java/org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7d3be3b1a6..64a4ff5433 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -818,10 +818,10 @@ public class StreamThread extends Thread { final long beforeCommitMs = now; final int committed = maybeCommit(); - totalCommittedSinceLastSummary += committed; final long commitLatency = Math.max(now - beforeCommitMs, 0); totalCommitLatency += commitLatency; if (committed > 0) { + totalCommittedSinceLastSummary += committed; commitSensor.record(commitLatency / (double) committed, now); if (log.isDebugEnabled()) {