[
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846791#comment-16846791
]
Matthias J. Sax commented on KAFKA-8413:
----------------------------------------
This should already be fixed. You need to turn on topology optimization though.
Compare https://issues.apache.org/jira/browse/KAFKA-6761
Seems we can close this ticket as "invalid" ?
> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: Levani Kokhreidze
> Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .selectKey((key, value) -> value);
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-1")
> );
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-2")
> );
> {code}
>
> This code will generate following topology:
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
> <-- KSTREAM-SOURCE-0000000000
> Processor: KSTREAM-FILTER-0000000004 (stores: [])
> --> KSTREAM-SINK-0000000003
> <-- KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-FILTER-0000000008 (stores: [])
> --> KSTREAM-SINK-0000000007
> <-- KSTREAM-KEY-SELECT-0000000001
> Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
> <-- KSTREAM-FILTER-0000000004
> Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
> <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
> --> KSTREAM-AGGREGATE-0000000002
> Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
> Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
> --> KSTREAM-AGGREGATE-0000000006
> Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000009
>
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation.
> In this example, two repartition topics are not really necessary and
> processing can be done with one sub-topology.
>
> Kafka Streams user, in DSL, may specify repartition topic manually using
> *KStream#through* method:
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
> .stream("input-topic")
> .selectKey((key, value) -> value)
> .through("repartition-topic");
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-1")
> );
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-2")
> );
> {code}
>
>
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
>
> While this gives possibility to optimizes Kafka Streams application, user
> still has to manually create repartition topic with correct number of
> partitions based on input topic. It would be great if in DSL we could have
> something like *repartition()* operation on *KStream* which can generate
> repartition topic based on user command.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)