mjsax commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2944080543
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -81,6 +82,46 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ Objects.requireNonNull(sessionKey, "sessionKey can't be null");
+ Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be
null");
+ Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't
be null");
+
+ try {
+ maybeMeasureLatency(
+ () -> {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete operation
from input record
+ final Headers newHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ newHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ final Bytes key = keyBytes(sessionKey, newHeaders,
serdes);
Review Comment:
We still have some code-mess here -- it's something I'll clean up with the
pending "metered stores need to pass headers correctly"...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]