Guozhang Wang created KAFKA-4601:
------------------------------------

             Summary: Avoid duplicated repartitioning in KStream DSL
                 Key: KAFKA-4601
                 URL: https://issues.apache.org/jira/browse/KAFKA-4601
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Guozhang Wang


Consider the following DSL:

{code}
Stream<String, String> source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1").map(..);

        KTable<String, Long> counts = source
                .groupByKey()
                .count("Counts");

        KStream<String, String> sink = source.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
                                KSTREAM-SOURCE-0000000000:
                                        topics:         [topic1]
                                        children:       [KSTREAM-MAP-0000000001]
                                KSTREAM-MAP-0000000001:
                                        children:       
[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
                                KSTREAM-FILTER-0000000004:
                                        children:       
[KSTREAM-SINK-0000000003]
                                KSTREAM-SINK-0000000003:
                                        topic:          X-Counts-repartition
                                KSTREAM-FILTER-0000000007:
                                        children:       
[KSTREAM-SINK-0000000006]
                                KSTREAM-SINK-0000000006:
                                        topic:          
X-KSTREAM-MAP-0000000001-repartition

ProcessorTopology:
                                KSTREAM-SOURCE-0000000008:
                                        topics:         
[X-KSTREAM-MAP-0000000001-repartition]
                                        children:       
[KSTREAM-LEFTJOIN-0000000009]
                                KSTREAM-LEFTJOIN-0000000009:
                                        states:         [Counts]
                                KSTREAM-SOURCE-0000000005:
                                        topics:         [X-Counts-repartition]
                                        children:       
[KSTREAM-AGGREGATE-0000000002]
                                KSTREAM-AGGREGATE-0000000002:
                                        states:         [Counts]
{code}

I.e. there are two repartition topics, one for the aggregate and one for the 
join, which not only introduce unnecessary overheads but also mess up the 
processing ordering (users are expecting each record to go through aggregation 
first then the join operator). And in order to get the following simpler 
topology users today need to add a {{through}} operator after {{map}} manually 
to enforce repartitioning.

{code}
ProcessorTopology:
                                KSTREAM-SOURCE-0000000000:
                                        topics:         [topic1]
                                        children:       [KSTREAM-MAP-0000000001]
                                KSTREAM-MAP-0000000001:
                                        children:       
[KSTREAM-SINK-0000000002]
                                KSTREAM-SINK-0000000002:
                                        topic:          topic 2

ProcessorTopology:
                                KSTREAM-SOURCE-0000000003:
                                        topics:         [topic 2]
                                        children:       
[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
                                KSTREAM-AGGREGATE-0000000004:
                                        states:         [Counts]
                                KSTREAM-LEFTJOIN-0000000005:
                                        states:         [Counts]
{code} 

This kind of optimization should be automatic in Streams, which we can consider 
doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to