[
https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907046#comment-17907046
]
Sebastien Viale commented on KAFKA-10659:
-----------------------------------------
The issue arises in the {{CogroupedStreamAggregateBuilder#processRepartitions}}
method.
In this method, there is a loop that processes each input stream requiring
repartitioning. If no name is provided, the repartition topic names (and the
sink and source processors attached to these topics) will use the store name of
the aggregate operator.
As a result, if multiple streams need to be repartitioned, multiple processors
will share the same name, which is not allowed.
One possible solution could be to add a counter, similar to what is done for
the coGroup processor. However, this approach risks breaking backward
compatibility.
For example, consider this topology:
{code:java}
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
final KGroupedStream<String, String> groupedOne = streamOne.groupBy((k, v) ->
v); // marks the stream for repartitioning
final KGroupedStream<String, String> groupedTwo = streamTwo.groupBy((k, v) ->
v); // marks the stream for repartitioning
Aggregator<String, String, String> agg1 = (key, value, aggregate) -> aggregate
+ value;
Aggregator<String, String, String> agg2 = (key, value, aggregate) -> aggregate
+ value;
final KTable<String, String> coGroupedStream = groupedOne
.cogroup(agg1)
.cogroup(groupedTwo, agg2)
.aggregate(() -> "");
coGroupedStream.toStream().to("output", Produced.as("sink"));{code}
would give two distinct names:
_COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter and_
_COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter_
{code:java}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [stream-topic])
--> KSTREAM-KEY-SELECT-0000000002
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
--> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter
<-- KSTREAM-SOURCE-0000000000
Processor:
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter (stores:
[])
--> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-sink
<-- KSTREAM-KEY-SELECT-0000000002
Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-sink
(topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition)
<-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000001 (topics: [stream-topic-two])
--> KSTREAM-KEY-SELECT-0000000003
Processor: KSTREAM-KEY-SELECT-0000000003 (stores: [])
--> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter
<-- KSTREAM-SOURCE-0000000001
Processor:
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter (stores:
[])
--> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-sink
<-- KSTREAM-KEY-SELECT-0000000003
Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-sink
(topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition)
<-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter
Sub-topology: 2
Source:
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-source (topics:
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition])
--> test-cogroup-agg-0
Source:
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-source (topics:
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition])
--> test-cogroup-agg-1
Processor: test-cogroup-agg-0 (stores:
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])
--> test-cogroup-merge
<-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-source
Processor: test-cogroup-agg-1 (stores:
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])
--> test-cogroup-merge
<-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-source
Processor: test-cogroup-merge (stores: [])
--> KTABLE-TOSTREAM-0000000014
<-- test-cogroup-agg-0, test-cogroup-agg-1
Processor: KTABLE-TOSTREAM-0000000014 (stores: [])
--> sink
<-- test-cogroup-merge
Sink: sink (topic: output)
<-- KTABLE-TOSTREAM-0000000014
{code}
> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
> Key: KAFKA-10659
> URL: https://issues.apache.org/jira/browse/KAFKA-10659
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.0, 2.5.1
> Reporter: blueedgenick
> Priority: Major
>
> Example to reproduce:
>
> {code:java}
> KGroupedStream<String, A> groupedA = builder
> .stream(topicA, Consumed.with(Serdes.String(), serdeA))
> .selectKey((aKey, aVal) -> aVal.someId)
> .groupByKey();
> KGroupedStream<String, B> groupedB = builder
> .stream(topicB, Consumed.with(Serdes.String(), serdeB))
> .selectKey((bKey, bVal) -> bVal.someId)
> .groupByKey();
> KGroupedStream<String, C> groupedC = builder
> .stream(topicC, Consumed.with(Serdes.String(), serdeC))
> .selectKey((cKey, cVal) -> cVal.someId)
> .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
> .cogroup(groupedB, AggregatorB)
> . cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
> KTable<String, ABC> agg = cogroup.aggregate(
> () -> new ABC(),
> Named.as("my-agg-proc-name"),
> Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
> "abc-agg-store")
> .withKeySerde(Serdes.String())
> .withValueSerde(serdeABC)
> );
> {code}
>
>
> This throws an exception during topology generation:
>
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology:
> Processor abc-agg-store-repartition-filter is already added. at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
> at
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
> at
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
> at ...
> {code}
>
> The same exception is observed if the `selectKey(...).groupByKey()` pattern
> is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state,
> explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if
> the grouping step is named by passing a `Grouped.with(...)` expression to
> either `groupByKey`` or `groupBy`.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)