[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17032512#comment-17032512
 ] 

John Roesler commented on KAFKA-7669:
-

This issue has been lurking for a while, and has been reported a number of 
different ways. It seems to take two forms:
1. changing the topology at all (in apparently compatible ways) can renumber 
operators and corrupt the application upon restart
2. changing the topology in combination with a rolling bounce results in 
members executing a different topology than the leader, which leads to extra 
problems (such as NPEs)

https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be 
more about just changing the topology at all
https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix
https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of 
KAFKA-8307
and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that 
results from a rolling-bounce topology change.

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702291#comment-16702291
 ] 

Guozhang Wang commented on KAFKA-7669:
--

[~nijo] Also note that there is another KIP (KIP-307) for allowing users to 
define all of their operators in the DSL if they wanted to, which goes beyond 
stateful operations as well which can help resolve the ordering issue. But 
after all the DSL does require order in order to generate the same topology 
(same graph, same node names) independently on different instances.

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697452#comment-16697452
 ] 

Matthias J. Sax commented on KAFKA-7669:


{quote}this order-consistency should be clearly indicated in docs
{quote}
I agree. Because in older version, there was not Scala API, and also Java7 
support, this was not an issue, as the code ensure ordering automatically... 
With Java8 and Scala API, we should update the docs.

Feel free to open an PR :)

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-23 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696653#comment-16696653
 ] 

Mateusz Owczarek commented on KAFKA-7669:
-

Thank you for the answer, I figured out that the topology definition order 
needs to be consistent throughout all of the application instances and already 
fixed my code properly. I was just pointing out, that from my point of view, 
this order-consistency should be clearly indicated in docs.

I have not heard about internal Kafka Streams topics explicit naming 
possibility in 2.1. Thanks for pointing it out! That solves many of my 
problems! :)

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696400#comment-16696400
 ] 

Matthias J. Sax commented on KAFKA-7669:


Thanks for reporting this. Note that KafkaStreams requires that all application 
instances execute the exact same topology. All operators get automatically 
assigned names that are use to repartition and changelog topics, too. Those 
name are different if the operators are added in a different order.

Thus, you should rewrite you program in a way that guarantees the ordering.

As an alternative, you can provide names for all stateful operators explicitly 
(need to upgrade to 2.1 for this). For this case, KafkaStreams uses the 
provided names and to name repartition and changelog topics. Thus, order should 
not matter any longer, as long as the final topology graph is the same.

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)