[
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andy Coates updated KAFKA-10494:
--------------------------------
Description:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is
unnecessarily calling `enableSendingOldValues` on the parent, even when the
processor itself is materialized. This can force the parent table to be
materialized unnecessarily.
For example:
{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
.table("t1", Consumed.of(...))
.filter(predicate, Materialized.as("t2"))
.<downStreamOps>
If `downStreamOps` result in a call to `enableSendingOldValues` on the table
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being
materialized unnecessarily.
This ticket was raised off the back of [comments in a
PR|#discussion_r490152263]] while working on KAFKA-10077.
A good test that highlights this would be to add this to `KTableFilterTest`:
{{@Test}}
{{public void
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
{}}
{{ final StreamsBuilder builder = new StreamsBuilder();}}
{{ final String topic1 = "topic1";}}
{{ final KTableImpl<String, Integer, Integer> table1 =}}
{{ (KTableImpl<String, Integer, Integer>) builder.table(topic1,
consumed);}}
{{ final KTableImpl<String, Integer, Integer> table2 =}}
{{ (KTableImpl<String, Integer, Integer>) table1.filter(predicate,
Materialized.as("store2"));}}
{{ table2.enableSendingOldValues(false);}}
{{ doTestSendingOldValue(builder, table1, table2, topic1);}}
{{}}}
Though this problem is not restricted to the filter call. Other processor
suppliers suffer from the same issue.
In addition, once [https://github.com/apache/kafka/pull/9156] is merged, if
you call {{enableSendingOldValues}} without forcing materialization on a table
that is itself materialized, but who's upstream is not. In such a situation,
the table will _not_ enable sending old values, but should.
was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is
unnecessarily calling `enableSendingOldValues` on the parent, even when the
processor itself is materialized. This can force the parent table to be
materialized unnecessarily.
For example:
{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
\{{ .table("t1", Consumed.of(...))}}
\{{ .filter(predicate, Materialized.as("t2"))}}
\{{ .<downStreamOps>}}
If `downStreamOps` result in a call to `enableSendingOldValues` on the table
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being
materialized unnecessarily.
This ticket was raised off the back of [comments in a
PR|#discussion_r490152263]] while working on KAFKA-10077.
A good test that highlights this would be to add this to `KTableFilterTest`:
{{@Test}}
{{public void
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
{}}
{{ final StreamsBuilder builder = new StreamsBuilder();}}
{{ final String topic1 = "topic1";}}
{{ final KTableImpl<String, Integer, Integer> table1 =}}
{{ (KTableImpl<String, Integer, Integer>) builder.table(topic1,
consumed);}}
{{ final KTableImpl<String, Integer, Integer> table2 =}}
{{ (KTableImpl<String, Integer, Integer>) table1.filter(predicate,
Materialized.as("store2"));}}
{{ table2.enableSendingOldValues(false);}}
{{ doTestSendingOldValue(builder, table1, table2, topic1);}}
{{}}}
Though this problem is not restricted to the filter call. Other processor
suppliers suffer from the same issue.
In addition, once [https://github.com/apache/kafka/pull/9156] is merged, if
you call {{enableSendingOldValues}} without forcing materialization on a table
that is itself materialized, but who's upstream is not. In such a situation,
the table will _not_ enable sending old values, but should.
> Streams: enableSendingOldValues should not call parent if node is itself
> materialized
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Andy Coates
> Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is
> unnecessarily calling `enableSendingOldValues` on the parent, even when the
> processor itself is materialized. This can force the parent table to be
> materialized unnecessarily.
>
> For example:
> {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
> .table("t1", Consumed.of(...))
> .filter(predicate, Materialized.as("t2"))
> .<downStreamOps>
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a
> PR|#discussion_r490152263]] while working on KAFKA-10077.
> A good test that highlights this would be to add this to `KTableFilterTest`:
> {{@Test}}
> {{public void
> shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
> {}}
> {{ final StreamsBuilder builder = new StreamsBuilder();}}
> {{ final String topic1 = "topic1";}}
> {{ final KTableImpl<String, Integer, Integer> table1 =}}
> {{ (KTableImpl<String, Integer, Integer>) builder.table(topic1,
> consumed);}}
> {{ final KTableImpl<String, Integer, Integer> table2 =}}
> {{ (KTableImpl<String, Integer, Integer>) table1.filter(predicate,
> Materialized.as("store2"));}}
> {{ table2.enableSendingOldValues(false);}}
> {{ doTestSendingOldValue(builder, table1, table2, topic1);}}
> {{}}}
> Though this problem is not restricted to the filter call. Other processor
> suppliers suffer from the same issue.
> In addition, once [https://github.com/apache/kafka/pull/9156] is merged, if
> you call {{enableSendingOldValues}} without forcing materialization on a
> table that is itself materialized, but who's upstream is not. In such a
> situation, the table will _not_ enable sending old values, but should.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)