[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846740#comment-16846740
 ] 

Levani Kokhreidze commented on KAFKA-8413:
------------------------------------------

Happy to work on KIP if this feature makes sense.

> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>
>                 Key: KAFKA-8413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8413
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Levani Kokhreidze
>            Priority: Minor
>         Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
>  
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
>    .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>    .selectKey((key, value) -> value);
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
>  
> This code will generate following topology:
>  
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
>  --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
>  <-- KSTREAM-SOURCE-0000000000
>  Processor: KSTREAM-FILTER-0000000004 (stores: [])
>  --> KSTREAM-SINK-0000000003
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-FILTER-0000000008 (stores: [])
>  --> KSTREAM-SINK-0000000007
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-0000000004
>  Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-0000000002
>  Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-0000000006
>  Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-0000000009
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
>  
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
>    .stream("input-topic")
>    .selectKey((key, value) -> value)
>    .through("repartition-topic");
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
>  
>  
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to