[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams
[ https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828165#comment-15828165 ] Robert Joseph Evans commented on STORM-2228: Sorry I got the review comments fixed, but STORM-2236 went into master and it is not going to be a simple merge by any means. > KafkaSpout does not replay properly when a topic maps to multiple streams > - > > Key: STORM-2228 > URL: https://issues.apache.org/jira/browse/STORM-2228 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3 >Reporter: Robert Joseph Evans >Assignee: Robert Joseph Evans >Priority: Blocker > > In the example. > KafkaSpoutTopologyMainNamedTopics.java > The code creates a TuplesBuilder and a KafkaSpoutStreams > {code} > protected KafkaSpoutTuplesBuildergetTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTest0Test1TupleBuilder (TOPICS[0], > TOPICS[1]), > new TopicTest2TupleBuilder (TOPICS[2])) > .build(); > } > protected KafkaSpoutStreams getKafkaSpoutStreams() { > final Fields outputFields = new Fields("topic", "partition", "offset", > "key", "value"); > final Fields outputFields1 = new Fields("topic", "partition", "offset"); > return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], > new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent > to test_stream > .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // > contents of topic test2 sent to test_stream > .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) > // contents of topic test2 sent to test2_stream > .build(); > } > {code} > Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and > {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", > "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for > {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", > "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}}) > There are two issues here. First with how the TupleBuilder and the > SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting > the full "topic" "partition" "offset" "key" "value", but this minor. The > real issue is that the code uses the same KafkaSpoutMessageId for all the > tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}. > https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304 > The code, however, is written to assume that it will only ever get one > ack/fail for a given KafkaSpoutMessageId. This means that if one of the > emitted tuple trees succeed and then the other fails, the failure will not > result in anything being replayed! This violates how storm is intended to > work. > I discovered this as a part of STORM-2225, and I am fine with fixing it on > STORM-2225 (I would just remove support for that functionality because there > are other ways of doing this correctly). But that would not maintain > backwards compatibility and I am not sure it would be appropriate for 1.x > releases. I really would like to have feedback from others on this. > I can put something into 1.x where it will throw an exception if acking is > enabled and this situation is present, but I don't want to spend the time > tying to do reference counting on the number of tuples actually emitted. If > someone else wants to do that I would be happy to turn this JIRA over to them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams
[ https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15827171#comment-15827171 ] Jungtaek Lim commented on STORM-2228: - [~revans2] I just would like to validate this issue and making progress (keep or mark as invalid), not for dropping priority. (Btw, STORM-2176 is marked as 'Critical' but acts as 'Blocker' for releasing 1.1.0.) Even though it turns out that STORM-2228 is not valid, I'm OK to raise priority of STORM-2225 as 'Blocker' and add STORM-2225 to 1.1.0 epic if we need the change. > KafkaSpout does not replay properly when a topic maps to multiple streams > - > > Key: STORM-2228 > URL: https://issues.apache.org/jira/browse/STORM-2228 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3 >Reporter: Robert Joseph Evans >Assignee: Robert Joseph Evans >Priority: Blocker > > In the example. > KafkaSpoutTopologyMainNamedTopics.java > The code creates a TuplesBuilder and a KafkaSpoutStreams > {code} > protected KafkaSpoutTuplesBuildergetTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTest0Test1TupleBuilder (TOPICS[0], > TOPICS[1]), > new TopicTest2TupleBuilder (TOPICS[2])) > .build(); > } > protected KafkaSpoutStreams getKafkaSpoutStreams() { > final Fields outputFields = new Fields("topic", "partition", "offset", > "key", "value"); > final Fields outputFields1 = new Fields("topic", "partition", "offset"); > return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], > new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent > to test_stream > .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // > contents of topic test2 sent to test_stream > .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) > // contents of topic test2 sent to test2_stream > .build(); > } > {code} > Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and > {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", > "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for > {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", > "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}}) > There are two issues here. First with how the TupleBuilder and the > SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting > the full "topic" "partition" "offset" "key" "value", but this minor. The > real issue is that the code uses the same KafkaSpoutMessageId for all the > tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}. > https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304 > The code, however, is written to assume that it will only ever get one > ack/fail for a given KafkaSpoutMessageId. This means that if one of the > emitted tuple trees succeed and then the other fails, the failure will not > result in anything being replayed! This violates how storm is intended to > work. > I discovered this as a part of STORM-2225, and I am fine with fixing it on > STORM-2225 (I would just remove support for that functionality because there > are other ways of doing this correctly). But that would not maintain > backwards compatibility and I am not sure it would be appropriate for 1.x > releases. I really would like to have feedback from others on this. > I can put something into 1.x where it will throw an exception if acking is > enabled and this situation is present, but I don't want to spend the time > tying to do reference counting on the number of tuples actually emitted. If > someone else wants to do that I would be happy to turn this JIRA over to them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams
[ https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823573#comment-15823573 ] Jungtaek Lim commented on STORM-2228: - [~revans2] I just start reading storm-kafka-client code, and seeing none of KafkaSpoutStreams implementations allow one topic associated to multiple streams, since *topicToStream* is a HashMap. - 1.x branch: https://github.com/apache/storm/blob/1.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java#L110 - 1.0.x branch: https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java#L99 So if my understanding is correct, TOPICS\[2] is only assigned to STREAMS\[2], not also STREAMS\[0]. (a bit confusing though... even example code and comment are wrong.) Given that a topic doesn't map to multiple streams, there's no chance for multiple tuples to have same KafkaSpoutMessageId (unless there's a problem with fetching and replaying), and in result, current implementation is valid. STORM-2225 seems to be needed, but this issue seems not be valid and no longer a blocker. [~hmclouro] Could you confirm this issue given that this is a blocker issue? > KafkaSpout does not replay properly when a topic maps to multiple streams > - > > Key: STORM-2228 > URL: https://issues.apache.org/jira/browse/STORM-2228 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3 >Reporter: Robert Joseph Evans >Assignee: Robert Joseph Evans >Priority: Blocker > > In the example. > KafkaSpoutTopologyMainNamedTopics.java > The code creates a TuplesBuilder and a KafkaSpoutStreams > {code} > protected KafkaSpoutTuplesBuildergetTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTest0Test1TupleBuilder (TOPICS[0], > TOPICS[1]), > new TopicTest2TupleBuilder (TOPICS[2])) > .build(); > } > protected KafkaSpoutStreams getKafkaSpoutStreams() { > final Fields outputFields = new Fields("topic", "partition", "offset", > "key", "value"); > final Fields outputFields1 = new Fields("topic", "partition", "offset"); > return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], > new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent > to test_stream > .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // > contents of topic test2 sent to test_stream > .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) > // contents of topic test2 sent to test2_stream > .build(); > } > {code} > Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and > {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", > "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for > {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", > "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}}) > There are two issues here. First with how the TupleBuilder and the > SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting > the full "topic" "partition" "offset" "key" "value", but this minor. The > real issue is that the code uses the same KafkaSpoutMessageId for all the > tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}. > https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304 > The code, however, is written to assume that it will only ever get one > ack/fail for a given KafkaSpoutMessageId. This means that if one of the > emitted tuple trees succeed and then the other fails, the failure will not > result in anything being replayed! This violates how storm is intended to > work. > I discovered this as a part of STORM-2225, and I am fine with fixing it on > STORM-2225 (I would just remove support for that functionality because there > are other ways of doing this correctly). But that would not maintain > backwards compatibility and I am not sure it would be appropriate for 1.x > releases. I really would like to have feedback from others on this. > I can put something into 1.x where it will throw an exception if acking is > enabled and this situation is present, but I don't want to spend the time > tying to do reference counting on the number of tuples actually emitted. If > someone else wants to do that I would be happy to turn this JIRA over to them.
[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams
[ https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15763796#comment-15763796 ] Jungtaek Lim commented on STORM-2228: - I'm OK to break backward compatibility even on 1.x since storm-kafka-client module is fairly new, and it's kind of evolving. Btw, we should've marked storm-kafka-client as 'evolving' opposite to 'stable' for several release periods. > KafkaSpout does not replay properly when a topic maps to multiple streams > - > > Key: STORM-2228 > URL: https://issues.apache.org/jira/browse/STORM-2228 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3 >Reporter: Robert Joseph Evans >Assignee: Robert Joseph Evans >Priority: Blocker > > In the example. > KafkaSpoutTopologyMainNamedTopics.java > The code creates a TuplesBuilder and a KafkaSpoutStreams > {code} > protected KafkaSpoutTuplesBuildergetTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTest0Test1TupleBuilder (TOPICS[0], > TOPICS[1]), > new TopicTest2TupleBuilder (TOPICS[2])) > .build(); > } > protected KafkaSpoutStreams getKafkaSpoutStreams() { > final Fields outputFields = new Fields("topic", "partition", "offset", > "key", "value"); > final Fields outputFields1 = new Fields("topic", "partition", "offset"); > return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], > new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent > to test_stream > .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // > contents of topic test2 sent to test_stream > .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) > // contents of topic test2 sent to test2_stream > .build(); > } > {code} > Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and > {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", > "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for > {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", > "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}}) > There are two issues here. First with how the TupleBuilder and the > SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting > the full "topic" "partition" "offset" "key" "value", but this minor. The > real issue is that the code uses the same KafkaSpoutMessageId for all the > tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}. > https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304 > The code, however, is written to assume that it will only ever get one > ack/fail for a given KafkaSpoutMessageId. This means that if one of the > emitted tuple trees succeed and then the other fails, the failure will not > result in anything being replayed! This violates how storm is intended to > work. > I discovered this as a part of STORM-2225, and I am fine with fixing it on > STORM-2225 (I would just remove support for that functionality because there > are other ways of doing this correctly). But that would not maintain > backwards compatibility and I am not sure it would be appropriate for 1.x > releases. I really would like to have feedback from others on this. > I can put something into 1.x where it will throw an exception if acking is > enabled and this situation is present, but I don't want to spend the time > tying to do reference counting on the number of tuples actually emitted. If > someone else wants to do that I would be happy to turn this JIRA over to them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)