mjsax commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2944080543


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -81,6 +82,46 @@ public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG>
 
     }
 
+    @Override
+    public void remove(final Windowed<K> sessionKey) {
+        Objects.requireNonNull(sessionKey, "sessionKey can't be null");
+        Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be 
null");
+        Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't 
be null");
+
+        try {
+            maybeMeasureLatency(
+                () -> {
+                    final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                    // Create new headers object to isolate delete operation 
from input record
+                    final Headers newHeaders = new 
RecordHeaders(currentContext.headers());
+
+                    // Create temporary context with new headers
+                    final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                        currentContext.timestamp(),
+                        currentContext.offset(),
+                        currentContext.partition(),
+                        currentContext.topic(),
+                        newHeaders
+                    );
+
+                    try {
+                        internalContext.setRecordContext(temporaryContext);
+                        final Bytes key = keyBytes(sessionKey, newHeaders, 
serdes);

Review Comment:
   We still have some code-mess here -- it's something I'll clean up with the 
pending "metered stores need to pass headers correctly"...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to