guozhangwang commented on a change in pull request #10720: URL: https://github.com/apache/kafka/pull/10720#discussion_r635597785
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java ########## @@ -141,16 +144,32 @@ public void close() { @Override public void init(final ProcessorContext context) { + internalProcessorContext = (InternalProcessorContext) context; parentGetter.init(context); valueTransformer.init(new ForwardingDisabledProcessorContext(context)); } @Override public ValueAndTimestamp<V1> get(final K key) { final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key); - return ValueAndTimestamp.make( + + final ProcessorRecordContext currentContext = internalProcessorContext.recordContext(); + + internalProcessorContext.setRecordContext(new ProcessorRecordContext( Review comment: For the longer term, I feel that we either need to 1) store the topic / offset information into the upstream materialized store as well, or 2) just disable this optimization for KTable.transformValues(), or at least allow users to either opt-in or opt-out given their knowledge on the context. As for now, I think leaving the offset as -1 and topic as null seems okay -- admittedly this would break someone who's using the context for offset / topic, as they would get unexpected values or even NPE, but that's still a fix forward then getting incorrect values silently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org