[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838
 ] 

Levani Kokhreidze commented on KAFKA-8413:
------------------------------------------

Hi Matthias,

I've enabled topology optimization, but in this particular example, there're 
still 2 repartition topics created.

 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code}
Kafka Streams version 2.2.0

 

Topics created:

 
{code:java}
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code}
 

 

 

Actually, one other thing why introducing additional manual repartitoin may be 
valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when 
key operation is followed by stateful operation, like 
*groupByKey().aggregate(...)* but there's may be the case, that in DSL user may 
be using stateful *transform(...)* operation for aggregation. Consider 
following example:

 

 
{code:java}
final KStream<String, String> streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .selectKey((key, value) -> value);

streamByProfileId.transform(...) // stateful tranformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation
{code}
 

In this example there's no repartition topic created, one the other hand if had 
something like `repartition()` operation on KStream we could write something 
like this, which would be pretty cool imho:
{code:java}
final KStream<String, String> streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .repartitionBy((key, value) -> new_key);

streamByProfileId.transform(...) // stateful transformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation{code}
 

> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>
>                 Key: KAFKA-8413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8413
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Levani Kokhreidze
>            Priority: Minor
>         Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
>    .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>    .selectKey((key, value) -> value);
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
>  --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
>  <-- KSTREAM-SOURCE-0000000000
>  Processor: KSTREAM-FILTER-0000000004 (stores: [])
>  --> KSTREAM-SINK-0000000003
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-FILTER-0000000008 (stores: [])
>  --> KSTREAM-SINK-0000000007
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-0000000004
>  Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-0000000002
>  Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-0000000006
>  Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-0000000009
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
>    .stream("input-topic")
>    .selectKey((key, value) -> value)
>    .through("repartition-topic");
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to