Victoria Xia created KAFKA-14723:
------------------------------------

             Summary: Do not write expired store records to changelog
                 Key: KAFKA-14723
                 URL: https://issues.apache.org/jira/browse/KAFKA-14723
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Victoria Xia


Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
the whether records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to