[ 
https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907046#comment-17907046
 ] 

Sebastien Viale commented on KAFKA-10659:
-----------------------------------------

The issue arises in the {{CogroupedStreamAggregateBuilder#processRepartitions}} 
method.

In this method, there is a loop that processes each input stream requiring 
repartitioning. If no name is provided, the repartition topic names (and the 
sink and source processors attached to these topics) will use the store name of 
the aggregate operator.

As a result, if multiple streams need to be repartitioned, multiple processors 
will share the same name, which is not allowed.

One possible solution could be to add a counter, similar to what is done for 
the coGroup processor. However, this approach risks breaking backward 
compatibility.

 

For example, consider this topology:
 
 
{code:java}
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);

final KGroupedStream<String, String> groupedOne = streamOne.groupBy((k, v) -> 
v); // marks the stream for repartitioning
final KGroupedStream<String, String> groupedTwo = streamTwo.groupBy((k, v) -> 
v); // marks the stream for repartitioning

Aggregator<String, String, String> agg1 = (key, value, aggregate) -> aggregate 
+ value;
Aggregator<String, String, String> agg2 = (key, value, aggregate) -> aggregate 
+ value;

final KTable<String, String> coGroupedStream = groupedOne
.cogroup(agg1)
.cogroup(groupedTwo, agg2)
.aggregate(() -> "");

coGroupedStream.toStream().to("output", Produced.as("sink"));{code}
 

 

would give two distinct names: 
_COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter and_ 
_COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter_

 
{code:java}
 
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [stream-topic])
      --> KSTREAM-KEY-SELECT-0000000002
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter
      <-- KSTREAM-SOURCE-0000000000
    Processor: 
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter (stores: 
[])
      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-sink
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-sink 
(topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition)
      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-filter
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-topic-two])
      --> KSTREAM-KEY-SELECT-0000000003
    Processor: KSTREAM-KEY-SELECT-0000000003 (stores: [])
      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter
      <-- KSTREAM-SOURCE-0000000001
    Processor: 
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter (stores: 
[])
      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-sink
      <-- KSTREAM-KEY-SELECT-0000000003
    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-sink 
(topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition)
      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-filter
  Sub-topology: 2
    Source: 
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-source (topics: 
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition])
      --> test-cogroup-agg-0
    Source: 
COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-source (topics: 
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition])
      --> test-cogroup-agg-1
    Processor: test-cogroup-agg-0 (stores: 
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])
      --> test-cogroup-merge
      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-1-repartition-source
    Processor: test-cogroup-agg-1 (stores: 
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])
      --> test-cogroup-merge
      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-2-repartition-source
    Processor: test-cogroup-merge (stores: [])
      --> KTABLE-TOSTREAM-0000000014
      <-- test-cogroup-agg-0, test-cogroup-agg-1
    Processor: KTABLE-TOSTREAM-0000000014 (stores: [])
      --> sink
      <-- test-cogroup-merge
    Sink: sink (topic: output)
      <-- KTABLE-TOSTREAM-0000000014
{code}
 

> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
>                 Key: KAFKA-10659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10659
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0, 2.5.1
>            Reporter: blueedgenick
>            Priority: Major
>
> Example to reproduce:
>  
> {code:java}
> KGroupedStream<String, A> groupedA = builder
>   .stream(topicA, Consumed.with(Serdes.String(), serdeA))
>   .selectKey((aKey, aVal) -> aVal.someId)
>   .groupByKey();
> KGroupedStream<String, B> groupedB = builder
>   .stream(topicB, Consumed.with(Serdes.String(), serdeB))
>   .selectKey((bKey, bVal) -> bVal.someId)
>   .groupByKey();
> KGroupedStream<String, C> groupedC = builder
>   .stream(topicC, Consumed.with(Serdes.String(), serdeC))
>   .selectKey((cKey, cVal) -> cVal.someId)
>   .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
>   .cogroup(groupedB, AggregatorB)
>  .  cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
>  KTable<String, ABC> agg = cogroup.aggregate(
>   () -> new ABC(),
>   Named.as("my-agg-proc-name"),
>   Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
>  "abc-agg-store") 
>  .withKeySerde(Serdes.String())
>  .withValueSerde(serdeABC)
>  );
> {code}
>  
>  
> This throws an exception during topology generation: 
>  
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: 
> Processor abc-agg-store-repartition-filter is already added. at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
>  at ...
> {code}
>  
> The same exception is observed if the `selectKey(...).groupByKey()`  pattern 
> is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, 
> explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if 
> the grouping step is named by passing a `Grouped.with(...)` expression to 
> either `groupByKey`` or `groupBy`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to