[
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838
]
Levani Kokhreidze commented on KAFKA-8413:
------------------------------------------
Hi Matthias,
I've enabled topology optimization, but in this particular example, there're
still 2 repartition topics created.
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code}
Kafka Streams version 2.2.0
Topics created:
{code:java}
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code}
Actually, one other thing why introducing additional manual repartitoin may be
valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when
key operation is followed by stateful operation, like
*groupByKey().aggregate(...)* but there's may be the case, that in DSL user may
be using stateful *transform(...)* operation for aggregation. Consider
following example:
{code:java}
final KStream<String, String> streamByProfileId = streamsBuilder
.stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((key, value) -> value);
streamByProfileId.transform(...) // stateful tranformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation
{code}
In this example there's no repartition topic created, one the other hand if had
something like `repartition()` operation on KStream we could write something
like this, which would be pretty cool imho:
{code:java}
final KStream<String, String> streamByProfileId = streamsBuilder
.stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
.repartitionBy((key, value) -> new_key);
streamByProfileId.transform(...) // stateful transformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation{code}
> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: Levani Kokhreidze
> Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .selectKey((key, value) -> value);
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-1")
> );
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-2")
> );
> {code}
>
> This code will generate following topology:
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
> <-- KSTREAM-SOURCE-0000000000
> Processor: KSTREAM-FILTER-0000000004 (stores: [])
> --> KSTREAM-SINK-0000000003
> <-- KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-FILTER-0000000008 (stores: [])
> --> KSTREAM-SINK-0000000007
> <-- KSTREAM-KEY-SELECT-0000000001
> Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
> <-- KSTREAM-FILTER-0000000004
> Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
> <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
> --> KSTREAM-AGGREGATE-0000000002
> Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
> Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
> --> KSTREAM-AGGREGATE-0000000006
> Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000009
>
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation.
> In this example, two repartition topics are not really necessary and
> processing can be done with one sub-topology.
>
> Kafka Streams user, in DSL, may specify repartition topic manually using
> *KStream#through* method:
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
> .stream("input-topic")
> .selectKey((key, value) -> value)
> .through("repartition-topic");
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-1")
> );
> streamByProfileId
> .groupByKey()
> .aggregate(
> () -> 0d,
> (key, value, aggregate) -> aggregate,
> Materialized.as("store-2")
> );
> {code}
>
>
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
>
> While this gives possibility to optimizes Kafka Streams application, user
> still has to manually create repartition topic with correct number of
> partitions based on input topic. It would be great if in DSL we could have
> something like *repartition()* operation on *KStream* which can generate
> repartition topic based on user command.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)