[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8377:
-----------------------------------
    Description: 
Kafka Streams uses an optimization to not materialize every result KTable. If a 
non-materialized KTable is input to a join, the lookup into the table results 
in a lookup of the parents table plus a call to the operator. For example,
{code:java}
KTable nonMaterialized = materializedTable.filter(...);
KTable table2 = ...

table2.join(nonMaterialized,...){code}
If there is a table2 input record, the lookup to the other side is performed as 
a lookup into materializedTable plus applying the filter().

For stateless operation like filter, this is safe. However, #transformValues() 
might have an attached state store. Hence, when an input record r is processed 
by #transformValues() with current state S, it might produce an output record 
r' (that is not materialized). When the join later does a lookup to get r from 
the parent table, there is no guarantee that #transformValues() again produces 
r' because its state might not be the same any longer. A similar issue applies 
to stateless #transformValue() the accessed the `ProcessorContext` – when the 
`ProcessorContext` is accessed a second time (when processing the data from the 
upstream lookup, to recompute the store content) the `ProcessorContext` would 
return different data (ie, now the data of the currently processed record)

Hence, it seems to be required, to always materialize the result of a 
KTable#transformValues() operation if there is state or if `ProcessorContext` 
is used – one issue is, that we don't know upfront if `ProcessorContext` is 
used and thus might be conservative and always materialize the result (maybe be 
this some what to optimize operations like `filter` though). Note, that if 
there would be a consecutive filter() after tranformValue(), it would also be 
ok to materialize the filter() result. Furthermore, if there is no downstream 
join(), materialization is also not required.

Basically, it seems to be unsafe to apply `KTableValueGetter` on a 
#transformValues()` operator.

  was:
Kafka Streams uses an optimization to not materialize every result KTable. If a 
non-materialized KTable is input to a join, the lookup into the table results 
in a lookup of the parents table plus a call to the operator. For example,
{code:java}
KTable nonMaterialized = materializedTable.filter(...);
KTable table2 = ...

table2.join(nonMaterialized,...){code}
If there is a table2 input record, the lookup to the other side is performed as 
a lookup into materializedTable plus applying the filter().

For stateless operation like filter, this is safe. However, #transformValues() 
might have an attached state store. Hence, when an input record r is processed 
by #transformValues() with current state S, it might produce an output record 
r' (that is not materialized). When the join later does a lookup to get r from 
the parent table, there is no guarantee that #transformValues() again produces 
r' because its state might not be the same any longer.

Hence, it seems to be required, to always materialize the result of a 
KTable#transformValues() operation if there is state. Note, that if there would 
be a consecutive filter() after tranformValue(), it would also be ok to 
materialize the filter() result. Furthermore, if there is no downstream join(), 
materialization is also not required.

Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
#transformValues()` operator.


> KTable#transformValue might lead to incorrect result in joins
> -------------------------------------------------------------
>
>                 Key: KAFKA-8377
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8377
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Aishwarya Pradeep Kumar
>            Priority: Major
>              Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer. A similar issue applies to stateless #transformValue() the 
> accessed the `ProcessorContext` – when the `ProcessorContext` is accessed a 
> second time (when processing the data from the upstream lookup, to recompute 
> the store content) the `ProcessorContext` would return different data (ie, 
> now the data of the currently processed record)
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state or if `ProcessorContext` 
> is used – one issue is, that we don't know upfront if `ProcessorContext` is 
> used and thus might be conservative and always materialize the result (maybe 
> be this some what to optimize operations like `filter` though). Note, that if 
> there would be a consecutive filter() after tranformValue(), it would also be 
> ok to materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a 
> #transformValues()` operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to