[jira] [Updated] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8705:
---
Fix Version/s: 2.4.1

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> keyChangingOperationsToOptimizableRepartitionNodes.keySet();
>  

[jira] [Updated] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-12-12 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8705:
---
Fix Version/s: 2.3.2
   2.5.0
   2.2.3

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2
>
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
>