I have a simple Kafka Streams topology of the format “Source Topic -> Processor -> Sink Topic” that is configured to use exactly-once processing guarantees. The Processor performs a reduce operation on the incoming messages and stores the results to a key-value state store (with logging enabled). The contents of the key-value state store are forwarded to the sink topic as part of a Punctuator implementation which is scheduled to run every 10 seconds (using wall clock time). It’s also important to note that the commit interval for the stream application is set to 1 second. The problem I’m seeing is that if new records aren’t being consumed off the source topic, then the Producer#commitTransaction() method is never invoked inside the StreamTask#commitOffsets() method since it’s blocked by the "if (commitOffsetNeeded)” condition - the commitOffsetNeeded field always appears to be false since no new records are being consumed. This becomes problematic in my case because the punctuator can be flushing items from the state store to the sink topic even when no new records are being consumed from the source topic. However, because the transaction is not being committed these messages can’t be consumed by downstream consumers configured with an isolation level of read_committed. Below is a more detailed description of the steps which are taking place.
1. The source topic only has a few messages on it and no new messages are arriving. 2. All the messages are consumed by the Processor and the results are placed into the state store before the first commit occurs at the 1 second mark. 3. The first commit occurs at the 1 second mark causing the StreamTask#commitOffsets() method to get invoked. Since messages were processed as part of step #2 the StreamTask#commitOffsetNeeded field is set to true which results in the Producer#commitTransaction() method being called on line 358. However, at this point in time, all the results are still in the state store and have not yet been sent to the sink topic. 4. The Punctuator is called at the 10 second mark and it forwards all the entries in the state store to the sink topic. 5. The next commit interval elapses, however, because no new incoming events have been processed, the StreamTask#commitOffsetNeeded field is now set to false so when the StreamTask#commitOffsets() method is invoked it exits without doing any work. As a result, the Producer#commitTransaction() method is never called. 6. Since the Producer#commitTransaction() method is never called, the messages which were placed onto the sink topic by the Punctuator in step #4 can never be seen by consumers configured with an isolation level of read_committed. 7. Furthermore, calling the ProcessorContext#commit() method within the Punctuator#punctuate() method does not seem to help the situation since the StreamTask#commitOffsets() method does not take into consideration the value of the StreamTask#commitRequested field. So my question is, would it be beneficial to update the logic in the StreamTask#commitOffsets() method so that if ProcessorContext#commit() has been called and exactly-once processing has been enabled that the Producer#commitTransaction() method is still called even if no records were consumed off the topic? This would help to handle the case where the punctuate call itself is producing messages to a downstream topic. Thanks, David