mjsax commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2944084553
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -133,6 +135,44 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
return currentValue;
}
+ @Override
+ public ValueTimestampHeaders<V> delete(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ try {
+ return maybeMeasureLatency(
+ () -> {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete operation
from input record
+ final Headers newHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ final byte[] deletedValue =
wrapped().delete(keyBytes(key, newHeaders));
+ return deserializeValue(deletedValue);
Review Comment:
Yes, that's part of the fix -- we want to avoid to "pollute" the current
input record's `Headers` object, and we pass these temp-headers via the
temp-context down to the changelogging layer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]