This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 683d0bbc4c MINOR: Guard against decrementing 
`totalCommittedSinceLastSummary` during rebalancing. (#12299)
683d0bbc4c is described below

commit 683d0bbc4ca7223e0ade55be58497af8aded6823
Author: James Hughes <jnh...@gmail.com>
AuthorDate: Thu Jun 16 12:40:08 2022 -0400

    MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during 
rebalancing. (#12299)
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../java/org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d3be3b1a6..64a4ff5433 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -818,10 +818,10 @@ public class StreamThread extends Thread {
 
                 final long beforeCommitMs = now;
                 final int committed = maybeCommit();
-                totalCommittedSinceLastSummary += committed;
                 final long commitLatency = Math.max(now - beforeCommitMs, 0);
                 totalCommitLatency += commitLatency;
                 if (committed > 0) {
+                    totalCommittedSinceLastSummary += committed;
                     commitSensor.record(commitLatency / (double) committed, 
now);
 
                     if (log.isDebugEnabled()) {

Reply via email to