chickenchickenlove commented on code in PR #20910:
URL: https://github.com/apache/kafka/pull/20910#discussion_r2577246796
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessValues.java:
##########
@@ -192,8 +181,13 @@ private ValueAndTimestamp<VOut> transformValue(final K
key, final ValueAndTimest
new RecordHeaders()
));
+ captureContext.forward = false;
+ fixedKeyProcessor.process(InternalFixedKeyRecordFactory.create(
+ new Record<>(key, getValueOrNull(valueAndTimestamp),
timestamp) // TODO: we might pass in -1L here, which would lead to an exception
Review Comment:
Hi @mjsax,
While trying to reason about `ForwardCaptureProcessorContext`, I was
wondering if we could avoid the reprocess `oldValue` path altogether when
`sendOldValues` is enabled.
Right now, for the non-queryable case, we recompute `f(oldV)` by running the
user processor a second time with the old value and using the `forward` flag to
suppress the actual downstream forwarding. Functionally this works, but it
makes the control flow in the context quite subtle.
An alternative design could be to back `transformValues` with an internal
result store whenever `sendOldValues` is true, even if `queryableName` is null.
In that case, the flow would be roughly:
- read `oldResult` from the internal store
- compute `newResult` by invoking the `FixedKeyProcessor` once on the
current record
- write `newResult` back to the store
- forward `Change(newResult, oldResult, …)` downstream
This would remove the need for the separate `capture-only` vs
`capture-and-forward` modes and the `forward` flag on the context, at the cost
of an internal state store when `sendOldValues` is requested.
For example, assuming we always initialize `store` when `sendOldValues` is
true (even without a `queryableName`):
```java
// Very Roughly....
// ForwardCaptureProcessorContext.java
public ForwardCaptureProcessorContext(final ProcessorContext<K,
Change<VOut>> context,
final String queryableName,
final boolean sendOldValues,
final FixedKeyProcessor<? super K, ?
super V, ? extends VOut> fixedKeyProcessor) {
...
if (queryableName != null) {
// materialized store wrapper
store = new KeyValueStoreWrapper<>(context, queryableName);
} else if (sendOldValues) {
// even non-queryable, for old values.
store = new InternalResultStoreWrapper<>(...);
}
}
@Override
public <KForward extends K, VForward extends VOut> void forward(
final FixedKeyRecord<KForward, VForward> record,
final String childName
) {
capturedKey = record.key();
capturedValue = record.value();
capturedTimestamp = record.timestamp();
final VOut oldValue = sendOldValues ?
getValueOrNull(store.get(capturedKey)) : null;
final long putReturnCode = store.put(capturedKey, capturedValue,
capturedTimestamp);
if (putReturnCode == PUT_RETURN_CODE_NOT_PUT) {
return;
}
boolean isLatest = sendOldValues ?
putReturnCode == PUT_RETURN_CODE_IS_LATEST :
inputRecord.value().isLatest;
final Change<VOut> change = new Change<>(capturedValue, oldValue,
isLatest);
if (queryableName != null) {
tupleForwarder.maybeForward(
new Record<>(capturedKey, change, capturedTimestamp,
record.headers())
);
} else {
context.forward(
new Record<>(capturedKey, change, capturedTimestamp,
record.headers()),
childName
);
}
}
```
I am not sure how acceptable this extra store would be from a resource or
semantics point of view, but it feels closer to how other `KTable` operators
compute old values and would simplify the context logic quite a bit. Does this
direction make sense, or would we prefer to keep `transformValues` free of any
additional internal state store?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]