mjsax commented on code in PR #21451:
URL: https://github.com/apache/kafka/pull/21451#discussion_r2818631295
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -420,11 +421,11 @@ public void close() {
}
protected V outerValue(final byte[] value) {
- return value != null ? serdes.valueFrom(value) : null;
+ return value != null ? serdes.valueFrom(value, new RecordHeaders()) :
null;
}
protected Bytes keyBytes(final K key) {
Review Comment:
I also think we need to mess with the context inside `put()` if
`valueTimestampeHeaders == null` (same for `putIfAbsent` and `delete`) -- (1)
get the current `internalContext.recordContext()` and save it, so we can
restore it before `put()` returns. (2) set a new
`internalContext.setRecordContext(new ProcessorRecordContext(...));` passing in
the timestamp, offset, partition, topic from the original record context, and
pass in a new emtpy headers object (which we also pass into `keyByte(...)`).
This allows us to pass down the headers correctly into the lower layers, ie,
caching/changelogging store (cf
https://github.com/apache/kafka/pull/21454/changes#r2818572145). (3) before
`put()` exits, restore the previous save record context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]