I have a simple Kafka Streams topology of the format “Source Topic -> Processor 
-> Sink Topic” that is configured to use exactly-once processing guarantees.  
The Processor performs a reduce operation on the incoming messages and stores 
the results to a key-value state store (with logging enabled).  The contents of 
the key-value state store are forwarded to the sink topic as part of a 
Punctuator implementation which is scheduled to run every 10 seconds (using 
wall clock time).  It’s also important to note that the commit interval for the 
stream application is set to 1 second.  The problem I’m seeing is that if new 
records aren’t being consumed off the source topic, then the 
Producer#commitTransaction() method is never invoked inside the 
StreamTask#commitOffsets() method since it’s blocked by the "if 
(commitOffsetNeeded)” condition - the commitOffsetNeeded field always appears 
to be false since no new records are being consumed.  This becomes problematic 
in my case because the punctuator can be flushing items from the state store to 
the sink topic even when no new records are being consumed from the source 
topic.  However, because the transaction is not being committed these messages 
can’t be consumed by downstream consumers configured with an isolation level of 
read_committed.  Below is a more detailed description of the steps which are 
taking place.

1. The source topic only has a few messages on it and no new messages are 
arriving.   
2. All the messages are consumed by the Processor and the results are placed 
into the state store before the first commit occurs at the 1 second mark.  
3. The first commit occurs at the 1 second mark causing the 
StreamTask#commitOffsets() method to get invoked.  Since messages were 
processed as part of step #2 the StreamTask#commitOffsetNeeded field is set to 
true which results in the Producer#commitTransaction() method being called on 
line 358.  However, at this point in time, all the results are still in the 
state store and have not yet been sent to the sink topic.  
4. The Punctuator is called at the 10 second mark and it forwards all the 
entries in the state store to the sink topic.
5. The next commit interval elapses, however, because no new incoming events 
have been processed, the StreamTask#commitOffsetNeeded field is now set to 
false so when the StreamTask#commitOffsets() method is invoked it exits without 
doing any work.  As a result, the Producer#commitTransaction() method is never 
called.  
6. Since the Producer#commitTransaction() method is never called, the messages 
which were placed onto the sink topic by the Punctuator in step #4 can never be 
seen by consumers configured with an isolation level of read_committed.
7. Furthermore, calling the ProcessorContext#commit() method within the 
Punctuator#punctuate() method does not seem to help the situation since the 
StreamTask#commitOffsets() method does not take into consideration the value of 
the StreamTask#commitRequested field.

So my question is, would it be beneficial to update the logic in the 
StreamTask#commitOffsets() method so that if ProcessorContext#commit() has been 
called and exactly-once processing has been enabled that the 
Producer#commitTransaction() method is still called even if no records were 
consumed off the topic?  This would help to handle the case where the punctuate 
call itself is producing messages to a downstream topic. 

Thanks,
David

Reply via email to