[jira] [Updated] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode
[ https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-8705: --- Fix Version/s: 2.4.1 > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode > - > > Key: KAFKA-8705 > URL: https://issues.apache.org/jira/browse/KAFKA-8705 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Hiroshi Nakahara >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1 > > > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode. > Kafka Stream version: 2.3.0 > h3. Code > {code:java} > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.KStream; > import java.util.Properties; > public class Main { > public static void main(String[] args) { > final StreamsBuilder streamsBuilder = new StreamsBuilder(); > final KStream parentStream = > streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), > Serdes.Integer())) > .selectKey(Integer::sum); // To make parentStream > KeyChaingingPoint > final KStream childStream1 = > parentStream.mapValues(v -> v + 1); > final KStream childStream2 = > parentStream.mapValues(v -> v + 2); > final KStream childStream3 = > parentStream.mapValues(v -> v + 3); > childStream1 > .merge(childStream2) > .merge(childStream3) > .to("outputTopic"); > final Properties properties = new Properties(); > properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, > StreamsConfig.OPTIMIZE); > streamsBuilder.build(properties); > } > } > {code} > h3. Expected result > streamsBuilder.build should create Topology without throwing Exception. The > expected topology is: > {code:java} > Topologies: >Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [parentTopic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, > KSTREAM-MAPVALUES-04 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-MAPVALUES-02 (stores: []) > --> KSTREAM-MERGE-05 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MAPVALUES-03 (stores: []) > --> KSTREAM-MERGE-05 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MAPVALUES-04 (stores: []) > --> KSTREAM-MERGE-06 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MERGE-05 (stores: []) > --> KSTREAM-MERGE-06 > <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03 > Processor: KSTREAM-MERGE-06 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04 > Sink: KSTREAM-SINK-07 (topic: outputTopic) > <-- KSTREAM-MERGE-06 > {code} > h3. Actual result > NullPointerException was thrown with the following stacktrace. > {code:java} > Exception in thread "main" java.lang.NullPointerException > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558) > at Main.main(Main.java:24){code} > h3. Cause > This exception occurs in > InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap. > {code:java} > private void maybeUpdateKeyChangingRepartitionNodeMap() { > final Map> > mergeNodesToKeyChangers = new HashMap<>(); > for (final StreamsGraphNode mergeNode : mergeNodes) { > mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); > final Collection keys = > keyChangingOperationsToOptimizableRepartitionNodes.keySet(); >
[jira] [Updated] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode
[ https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-8705: --- Fix Version/s: 2.3.2 2.5.0 2.2.3 > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode > - > > Key: KAFKA-8705 > URL: https://issues.apache.org/jira/browse/KAFKA-8705 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Hiroshi Nakahara >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.2.3, 2.5.0, 2.3.2 > > > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode. > Kafka Stream version: 2.3.0 > h3. Code > {code:java} > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.KStream; > import java.util.Properties; > public class Main { > public static void main(String[] args) { > final StreamsBuilder streamsBuilder = new StreamsBuilder(); > final KStream parentStream = > streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), > Serdes.Integer())) > .selectKey(Integer::sum); // To make parentStream > KeyChaingingPoint > final KStream childStream1 = > parentStream.mapValues(v -> v + 1); > final KStream childStream2 = > parentStream.mapValues(v -> v + 2); > final KStream childStream3 = > parentStream.mapValues(v -> v + 3); > childStream1 > .merge(childStream2) > .merge(childStream3) > .to("outputTopic"); > final Properties properties = new Properties(); > properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, > StreamsConfig.OPTIMIZE); > streamsBuilder.build(properties); > } > } > {code} > h3. Expected result > streamsBuilder.build should create Topology without throwing Exception. The > expected topology is: > {code:java} > Topologies: >Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [parentTopic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, > KSTREAM-MAPVALUES-04 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-MAPVALUES-02 (stores: []) > --> KSTREAM-MERGE-05 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MAPVALUES-03 (stores: []) > --> KSTREAM-MERGE-05 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MAPVALUES-04 (stores: []) > --> KSTREAM-MERGE-06 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MERGE-05 (stores: []) > --> KSTREAM-MERGE-06 > <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03 > Processor: KSTREAM-MERGE-06 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04 > Sink: KSTREAM-SINK-07 (topic: outputTopic) > <-- KSTREAM-MERGE-06 > {code} > h3. Actual result > NullPointerException was thrown with the following stacktrace. > {code:java} > Exception in thread "main" java.lang.NullPointerException > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558) > at Main.main(Main.java:24){code} > h3. Cause > This exception occurs in > InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap. > {code:java} > private void maybeUpdateKeyChangingRepartitionNodeMap() { > final Map> > mergeNodesToKeyChangers = new HashMap<>(); > for (final StreamsGraphNode mergeNode : mergeNodes) { > mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); > final Collection keys = >