Walker Carlson created KAFKA-9299:
-------------------------------------

             Summary: Over eager optimization
                 Key: KAFKA-9299
                 URL: https://issues.apache.org/jira/browse/KAFKA-9299
             Project: Kafka
          Issue Type: Task
          Components: streams
            Reporter: Walker Carlson


There are a few cases where the optimizer will attempt an optimization that can 
cause a copartitioning failure. Known case of this are related to join and 
cogroup, however could also effect merge or others. 

Take for example three input topics A, B and C  with 2, 3 and 4 partitions 
respectively.

B' = B.map();

B'.join(A)

B'.join(C)

 

the optimizer will push up the repartition upstream and with will cause the 
copartitioning to fail.

Can be seen with the following test:

    @Test
    public void shouldInsertRepartitionsTopicForCogroupsUsedTwice() {
        final StreamsBuilder builder = new StreamsBuilder();

        final Properties properties = new Properties();
        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);

        final KStream<String, String> stream1 = builder.stream("one", 
stringConsumed);

        final KGroupedStream<String, String> groupedOne = stream1.map((k, v) -> 
new KeyValue<>(v, k)).groupByKey(Grouped.as("foo"));

        final CogroupedKStream<String, String> one = 
groupedOne.cogroup(STRING_AGGREGATOR);
        one.aggregate(STRING_INITIALIZER);
        one.aggregate(STRING_INITIALIZER);

        final String topologyDescription = 
builder.build(properties).describe().toString();

        System.err.println(topologyDescription);
    }

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [one])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> foo-repartition-filter
      <-- KSTREAM-SOURCE-0000000000
    Processor: foo-repartition-filter (stores: [])
      --> foo-repartition-sink
      <-- KSTREAM-MAP-0000000001
    Sink: foo-repartition-sink (topic: foo-repartition)
      <-- foo-repartition-filter

  Sub-topology: 1
    Source: foo-repartition-source (topics: [foo-repartition])
      --> COGROUPKSTREAM-AGGREGATE-0000000006, 
COGROUPKSTREAM-AGGREGATE-0000000012
    Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: 
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> COGROUPKSTREAM-MERGE-0000000007
      <-- foo-repartition-source
    Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: 
[COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])
      --> COGROUPKSTREAM-MERGE-0000000013
      <-- foo-repartition-source
    Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])
      --> none
      <-- COGROUPKSTREAM-AGGREGATE-0000000006
    Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])
      --> none
      <-- COGROUPKSTREAM-AGGREGATE-0000000012




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to