mjsax commented on a change in pull request #11675:
URL: https://github.com/apache/kafka/pull/11675#discussion_r791199995



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1321,11 +1322,22 @@ int process(final int maxNumRecords, final Time time) {
                 totalProcessed += processed;
                 task.recordProcessBatchTime(now - then);
             }
+            successfullyProcessed.add(task);
         }
-
+        successfullyProcessed.clear();
         return totalProcessed;
     }
 
+    public int commitSuccessfullyProcessedTasks() {
+        final int committed = commit(successfullyProcessed);

Review comment:
       Correct. You need to distinguish between the read and write path here: 
the producer has no idea from what input partitions we consume, and thus it can 
only offer a generic `addOffsetsToTransaction` API and it's the user's (ie, 
our) responsibility to provide the correct offsets. What we write into the 
output topic has nothing to do with what we consumed from the input though: 
Even if you omit offsets for some input partitions, you still would have the 
corresponding pending writes on the output topic, so just omitting the offsets 
from the map does not help you to abort the pending writes for those 
partitions. What writes are committed has nothing to do with what offsets you 
pass to the producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to