chickenchickenlove commented on code in PR #20910:
URL: https://github.com/apache/kafka/pull/20910#discussion_r2577246796


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessValues.java:
##########
@@ -192,8 +181,13 @@ private ValueAndTimestamp<VOut> transformValue(final K 
key, final ValueAndTimest
                 new RecordHeaders()
             ));
 
+            captureContext.forward = false;
+            fixedKeyProcessor.process(InternalFixedKeyRecordFactory.create(
+                new Record<>(key, getValueOrNull(valueAndTimestamp), 
timestamp) // TODO: we might pass in -1L here, which would lead to an exception

Review Comment:
   Hi @mjsax,
   
   While trying to reason about `ForwardCaptureProcessorContext`, I was 
wondering if we could avoid the reprocess `oldValue` path altogether when 
`sendOldValues` is enabled.
   
   Right now, for the non-queryable case, we recompute `f(oldV)` by running the 
user processor a second time with the old value and using the `forward` flag to 
suppress the actual downstream forwarding. Functionally this works, but it 
makes the control flow in the context quite subtle.
   
   An alternative design could be to back `transformValues` with an internal 
result store whenever `sendOldValues` is true, even if `queryableName` is null. 
In that case, the flow would be roughly:
   
   - read `oldResult` from the internal store
   - compute `newResult` by invoking the `FixedKeyProcessor` once on the 
current record
   - write `newResult` back to the store
   - forward `Change(newResult, oldResult, …)` downstream
   
   This would remove the need for the separate `capture-only` vs 
`capture-and-forward` modes and the `forward` flag on the context, at the cost 
of an internal state store when `sendOldValues` is requested.
   
   For example, assuming we always initialize `store` when `sendOldValues` is 
true (even without a `queryableName`):
   
   ```java
   // Very Roughly....
   // ForwardCaptureProcessorContext.java
   
   public ForwardCaptureProcessorContext(final ProcessorContext<K, 
Change<VOut>> context,
                                         final String queryableName,
                                         final boolean sendOldValues,
                                         final FixedKeyProcessor<? super K, ? 
super V, ? extends VOut> fixedKeyProcessor) {
           ...
   
     if (queryableName != null) {
       // materialized store wrapper
       store = new KeyValueStoreWrapper<>(context, queryableName);
     } else if (sendOldValues) {
       // even non-queryable, for old values.
       store = new InternalResultStoreWrapper<>(...); 
     }
   }
   
   @Override
   public <KForward extends K, VForward extends VOut> void forward(
           final FixedKeyRecord<KForward, VForward> record,
           final String childName
   ) {
       capturedKey = record.key();
       capturedValue = record.value();
       capturedTimestamp = record.timestamp();
   
       final VOut oldValue = sendOldValues ? 
getValueOrNull(store.get(capturedKey)) : null;
       final long putReturnCode = store.put(capturedKey, capturedValue, 
capturedTimestamp);
       if (putReturnCode == PUT_RETURN_CODE_NOT_PUT) {
           return;
       }
   
       boolean isLatest = sendOldValues ? 
                          putReturnCode == PUT_RETURN_CODE_IS_LATEST : 
                          inputRecord.value().isLatest;
       final Change<VOut> change =  new Change<>(capturedValue, oldValue, 
isLatest);
   
       if (queryableName != null) {
           tupleForwarder.maybeForward(
                   new Record<>(capturedKey, change, capturedTimestamp, 
record.headers())
           );
       } else {
           context.forward(
                   new Record<>(capturedKey, change, capturedTimestamp, 
record.headers()),
                   childName
           );
       }
   }
   ```
      
   I am not sure how acceptable this extra store would be from a resource or 
semantics point of view, but it feels closer to how other `KTable` operators 
compute old values and would simplify the context logic quite a bit. Does this 
direction make sense, or would we prefer to keep `transformValues` free of any 
additional internal state store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to