[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838 ]
Levani Kokhreidze edited comment on KAFKA-8413 at 5/23/19 4:28 PM: ------------------------------------------------------------------- Hi Matthias, I've enabled topology optimization, but in this particular example, there're still 2 repartition topics created (Kafka Streams version 2.2.0). {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code} Topics created: {code:java} 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition {code} Actually, one other thing why introducing additional manual repartitoin may be valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when key operation is followed by stateful operation, like *groupByKey().aggregate(...)* but there's may be the case, that in DSL user may be using stateful *transform(...)* operation for aggregation. Consider following example: {code:java} final KStream<String, String> streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId.transform(...) // stateful tranformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation {code} In this example there's no repartition topic created, one the other hand if had something like `repartition()` operation on KStream we could write something like this, which would be pretty cool imho: {code:java} final KStream<String, String> streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .repartitionBy((key, value) -> new_key); streamByProfileId.transform(...) // stateful transformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation{code} was (Author: lkokhreidze): Hi Matthias, I've enabled topology optimization, but in this particular example, there're still 2 repartition topics created. {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code} Kafka Streams version 2.2.0 Topics created: {code:java} 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog 867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code} Actually, one other thing why introducing additional manual repartitoin may be valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when key operation is followed by stateful operation, like *groupByKey().aggregate(...)* but there's may be the case, that in DSL user may be using stateful *transform(...)* operation for aggregation. Consider following example: {code:java} final KStream<String, String> streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId.transform(...) // stateful tranformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation {code} In this example there's no repartition topic created, one the other hand if had something like `repartition()` operation on KStream we could write something like this, which would be pretty cool imho: {code:java} final KStream<String, String> streamByProfileId = streamsBuilder .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String())) .repartitionBy((key, value) -> new_key); streamByProfileId.transform(...) // stateful transformer with aggregation streamByProfileId.transform(...) // stateful transformer with aggregation{code} > Add possibility to do repartitioning on KStream > ----------------------------------------------- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams > Reporter: Levani Kokhreidze > Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream<String, String> streamByProfileId = streamsBuilder > .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) > .selectKey((key, value) -> value); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") > ); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") > ); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) > --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008 > <-- KSTREAM-SOURCE-0000000000 > Processor: KSTREAM-FILTER-0000000004 (stores: []) > --> KSTREAM-SINK-0000000003 > <-- KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-FILTER-0000000008 (stores: []) > --> KSTREAM-SINK-0000000007 > <-- KSTREAM-KEY-SELECT-0000000001 > Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition) > <-- KSTREAM-FILTER-0000000004 > Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition) > <-- KSTREAM-FILTER-0000000008 > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-0000000002 > Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-0000000005 > Sub-topology: 2 > Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-0000000006 > Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-0000000009 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream<Object, Object> streamByProfileId = streamsBuilder > .stream("input-topic") > .selectKey((key, value) -> value) > .through("repartition-topic"); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") > ); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") > ); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) > --> KSTREAM-SINK-0000000002 > <-- KSTREAM-SOURCE-0000000000 > Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-0000000001 > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005 > Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-0000000003 > Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-0000000003 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)