ableegoldman commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r698868805



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since 
last commit
         // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {

Review comment:
       What if we just removed the check altogether? It's not like updating the 
changelog offsets is a particularly "heavy" call, we may as well future-proof 
things even more by just updating the offsets any time.
   
   In fact, why do we even have this weird split brain logic to begin with...it 
would make more sense to just update the offsets inside the 
`StreamTask#maybeWriteCheckpoint` and `stateMgr.checkpoint()` methods, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to