[
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846844#comment-16846844
]
Matthias J. Sax commented on KAFKA-8413:
----------------------------------------
{quote}I've enabled topology optimization, but in this particular example,
there're still 2 repartition topics created (Kafka Streams version 2.2.0).
{quote}
\cc [~bbejeck] – Can you look into this? This would be a bug.
About the other request: I agree that this might be helpful, and in fact there
is a similar ticket, including a KIP draft for this:
* https://issues.apache.org/jira/browse/KAFKA-6037
*
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint]
The KIP is inactive, to feel free to pick it up.
I would not add `repartition()` operation though, but stick with `through()`
and make the topic-name optional to let KS manage the topic.
> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: Levani Kokhreidze
> Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .selectKey((key, value) -> value);
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-1")
> );
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-2")
> );
> {code}
>
> This code will generate following topology:
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
> <-- KSTREAM-SOURCE-0000000000
> Processor: KSTREAM-FILTER-0000000004 (stores: [])
> --> KSTREAM-SINK-0000000003
> <-- KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-FILTER-0000000008 (stores: [])
> --> KSTREAM-SINK-0000000007
> <-- KSTREAM-KEY-SELECT-0000000001
> Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
> <-- KSTREAM-FILTER-0000000004
> Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
> <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
> --> KSTREAM-AGGREGATE-0000000002
> Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
> Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
> --> KSTREAM-AGGREGATE-0000000006
> Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000009
>
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation.
> In this example, two repartition topics are not really necessary and
> processing can be done with one sub-topology.
>
> Kafka Streams user, in DSL, may specify repartition topic manually using
> *KStream#through* method:
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
> .stream("input-topic")
> .selectKey((key, value) -> value)
> .through("repartition-topic");
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-1")
> );
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-2")
> );
> {code}
>
>
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
>
> While this gives possibility to optimizes Kafka Streams application, user
> still has to manually create repartition topic with correct number of
> partitions based on input topic. It would be great if in DSL we could have
> something like *repartition()* operation on *KStream* which can generate
> repartition topic based on user command.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)