[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-12-16 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997490#comment-16997490
 ] 

Bill Bejeck commented on KAFKA-8705:


cherry-picked to 2.4

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-12-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995127#comment-16995127
 ] 

ASF GitHub Bot commented on KAFKA-8705:
---

bbejeck commented on pull request #7117: KAFKA-8705: Remove parent node after 
leaving loop to prevent NPE
URL: https://github.com/apache/kafka/pull/7117
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-09-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920779#comment-16920779
 ] 

Reynir Hübner commented on KAFKA-8705:
--

I get the same exception on version *2.2.1* when TOPOLOGY_OPTIMIZATION is set 
to OPTIMIZE.

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-30 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896500#comment-16896500
 ] 

Matthias J. Sax commented on KAFKA-8705:


[~bbejeck] Is 2.3 the only affected version?

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893100#comment-16893100
 ] 

ASF GitHub Bot commented on KAFKA-8705:
---

bbejeck commented on pull request #7117: KAFKA-8705: Remove parent node after 
leaving loop to prevent NPE
URL: https://github.com/apache/kafka/pull/7117
 
 
   Fixes case where multiple children merged from a key-changing node causes an 
NPE.
   
   I've updated the tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-24 Thread Hiroshi Nakahara (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16892339#comment-16892339
 ] 

Hiroshi Nakahara commented on KAFKA-8705:
-

[~bbejeck] Thank you for working on it quickly!  I'm happy to hear about that :)

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16892228#comment-16892228
 ] 

ASF GitHub Bot commented on KAFKA-8705:
---

bbejeck commented on pull request #7109: KAFKA-8705: Remove parent node after 
leaving loop to prevent NPE
URL: https://github.com/apache/kafka/pull/7109
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16892227#comment-16892227
 ] 

ASF GitHub Bot commented on KAFKA-8705:
---

bbejeck commented on pull request #7109: KAFKA-8705: Remove parent node after 
leaving loop to prevent NPE
URL: https://github.com/apache/kafka/pull/7109
 
 
   Fixes case where multiple children merged from a key-changing node causes an 
NPE.
   
   I've updated the tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> 

[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-24 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891887#comment-16891887
 ] 

Bill Bejeck commented on KAFKA-8705:


[~hiro116s] thanks for reporting this, we'll take a look into this.

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
>