[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032512#comment-17032512 ] John Roesler commented on KAFKA-7669: - This issue has been lurking for a while, and has been reported a number of different ways. It seems to take two forms: 1. changing the topology at all (in apparently compatible ways) can renumber operators and corrupt the application upon restart 2. changing the topology in combination with a rolling bounce results in members executing a different topology than the leader, which leads to extra problems (such as NPEs) https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be more about just changing the topology at all https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of KAFKA-8307 and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that results from a rolling-bounce topology change. > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702291#comment-16702291 ] Guozhang Wang commented on KAFKA-7669: -- [~nijo] Also note that there is another KIP (KIP-307) for allowing users to define all of their operators in the DSL if they wanted to, which goes beyond stateful operations as well which can help resolve the ordering issue. But after all the DSL does require order in order to generate the same topology (same graph, same node names) independently on different instances. > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697452#comment-16697452 ] Matthias J. Sax commented on KAFKA-7669: {quote}this order-consistency should be clearly indicated in docs {quote} I agree. Because in older version, there was not Scala API, and also Java7 support, this was not an issue, as the code ensure ordering automatically... With Java8 and Scala API, we should update the docs. Feel free to open an PR :) > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696653#comment-16696653 ] Mateusz Owczarek commented on KAFKA-7669: - Thank you for the answer, I figured out that the topology definition order needs to be consistent throughout all of the application instances and already fixed my code properly. I was just pointing out, that from my point of view, this order-consistency should be clearly indicated in docs. I have not heard about internal Kafka Streams topics explicit naming possibility in 2.1. Thanks for pointing it out! That solves many of my problems! :) > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696400#comment-16696400 ] Matthias J. Sax commented on KAFKA-7669: Thanks for reporting this. Note that KafkaStreams requires that all application instances execute the exact same topology. All operators get automatically assigned names that are use to repartition and changelog topics, too. Those name are different if the operators are added in a different order. Thus, you should rewrite you program in a way that guarantees the ordering. As an alternative, you can provide names for all stateful operators explicitly (need to upgrade to 2.1 for this). For this case, KafkaStreams uses the provided names and to name repartition and changelog topics. Thus, order should not matter any longer, as long as the final topology graph is the same. > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)