UladzislauBlok commented on code in PR #21536:
URL: https://github.com/apache/kafka/pull/21536#discussion_r2898012150
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java:
##########
@@ -76,7 +78,7 @@ KeyValue<Windowed<Bytes>, byte[]> deserializeStorePair(final
KeyValue<Windowed<B
@Override
Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
final byte[] binaryKey = cacheFunction.key(cacheKey).get();
- return storeKeyToWindowKey.toWindowKey(binaryKey, windowSize,
serdes.keyDeserializer(), serdes.topic());
+ return storeKeyToWindowKey.toWindowKey(binaryKey, windowSize,
serdes.keyDeserializer(), new RecordHeaders(), serdes.topic());
Review Comment:
From what I saw it would be quite complicated. We'll need to update
`AbstractMergedSortedCacheStoreIterator`, because currently we have access only
only this:
```
final Bytes nextCacheKey = cacheIterator.hasNext() ?
cacheIterator.peekNextKey() : null;
final KS nextStoreKey = storeIterator.hasNext() ?
storeIterator.peekNextKey() : null;
```
We can get cache value bytes from cache (I will need to find out if those
bytes have headers)
The problem is that from what I see storeIterator doesn't have a headers
From what I see, we can try to propagate headers when creating interator,
e.g.,
`CachingWindowStore # KeyValueIterator<Windowed<Bytes>, byte[]> fetch` , but
it will require quite a code changes
Any thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]