guozhangwang commented on a change in pull request #10720:
URL: https://github.com/apache/kafka/pull/10720#discussion_r635597785



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -141,16 +144,32 @@ public void close() {
 
         @Override
         public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
             parentGetter.init(context);
             valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
         }
 
         @Override
         public ValueAndTimestamp<V1> get(final K key) {
             final ValueAndTimestamp<V> valueAndTimestamp = 
parentGetter.get(key);
-            return ValueAndTimestamp.make(
+
+            final ProcessorRecordContext currentContext = 
internalProcessorContext.recordContext();
+
+            internalProcessorContext.setRecordContext(new 
ProcessorRecordContext(

Review comment:
       For the longer term, I feel that we either need to 1) store the topic / 
offset information into the upstream materialized store as well, or 2) just 
disable this optimization for KTable.transformValues(), or at least allow users 
to either opt-in or opt-out given their knowledge on the context.
   
   As for now, I think leaving the offset as -1 and topic as null seems okay -- 
admittedly this would break someone who's using the context for offset / topic, 
as they would get unexpected values or even NPE, but that's still a fix forward 
then getting incorrect values silently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to