[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams

2017-01-18 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828165#comment-15828165
 ] 

Robert Joseph Evans commented on STORM-2228:


Sorry I got the review comments fixed, but STORM-2236 went into master and it 
is not going to be a simple merge by any means.

> KafkaSpout does not replay properly when a topic maps to multiple streams
> -
>
> Key: STORM-2228
> URL: https://issues.apache.org/jira/browse/STORM-2228
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>Priority: Blocker
>
> In the example.
> KafkaSpoutTopologyMainNamedTopics.java
> The code creates a TuplesBuilder and a KafkaSpoutStreams
> {code}
> protected KafkaSpoutTuplesBuilder getTuplesBuilder() {
> return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
> new TopicsTest0Test1TupleBuilder(TOPICS[0], 
> TOPICS[1]),
> new TopicTest2TupleBuilder(TOPICS[2]))
> .build();
> }
> protected KafkaSpoutStreams getKafkaSpoutStreams() {
> final Fields outputFields = new Fields("topic", "partition", "offset", 
> "key", "value");
> final Fields outputFields1 = new Fields("topic", "partition", "offset");
> return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
> new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent 
> to test_stream
> .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
> contents of topic test2 sent to test_stream
> .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  
> // contents of topic test2 sent to test2_stream
> .build();
> }
> {code}
> Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
> {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
> "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
> {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
> "offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})
> There are two issues here.  First with how the TupleBuilder and the 
> SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting 
> the full "topic" "partition" "offset" "key" "value", but this minor.  The 
> real issue is that the code uses the same KafkaSpoutMessageId for all the 
> tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.
> https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304
> The code, however, is written to assume that it will only ever get one 
> ack/fail for a given KafkaSpoutMessageId.  This means that if one of the 
> emitted tuple trees succeed and then the other fails, the failure will not 
> result in anything being replayed!  This violates how storm is intended to 
> work.
> I discovered this as a part of STORM-2225, and I am fine with fixing it on 
> STORM-2225 (I would just remove support for that functionality because there 
> are other ways of doing this correctly).  But that would not maintain 
> backwards compatibility and I am not sure it would be appropriate for 1.x 
> releases.  I really would like to have feedback from others on this.
> I can put something into 1.x where it will throw an exception if acking is 
> enabled and this situation is present, but I don't want to spend the time 
> tying to do reference counting on the number of tuples actually emitted.  If 
> someone else wants to do that I would be happy to turn this JIRA over to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams

2017-01-17 Thread Jungtaek Lim (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15827171#comment-15827171
 ] 

Jungtaek Lim commented on STORM-2228:
-

[~revans2]
I just would like to validate this issue and making progress (keep or mark as 
invalid), not for dropping priority.
(Btw, STORM-2176 is marked as 'Critical' but acts as 'Blocker' for releasing 
1.1.0.)

Even though it turns out that STORM-2228 is not valid, I'm OK to raise priority 
of STORM-2225 as 'Blocker' and add STORM-2225 to 1.1.0 epic if we need the 
change.

> KafkaSpout does not replay properly when a topic maps to multiple streams
> -
>
> Key: STORM-2228
> URL: https://issues.apache.org/jira/browse/STORM-2228
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>Priority: Blocker
>
> In the example.
> KafkaSpoutTopologyMainNamedTopics.java
> The code creates a TuplesBuilder and a KafkaSpoutStreams
> {code}
> protected KafkaSpoutTuplesBuilder getTuplesBuilder() {
> return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
> new TopicsTest0Test1TupleBuilder(TOPICS[0], 
> TOPICS[1]),
> new TopicTest2TupleBuilder(TOPICS[2]))
> .build();
> }
> protected KafkaSpoutStreams getKafkaSpoutStreams() {
> final Fields outputFields = new Fields("topic", "partition", "offset", 
> "key", "value");
> final Fields outputFields1 = new Fields("topic", "partition", "offset");
> return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
> new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent 
> to test_stream
> .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
> contents of topic test2 sent to test_stream
> .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  
> // contents of topic test2 sent to test2_stream
> .build();
> }
> {code}
> Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
> {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
> "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
> {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
> "offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})
> There are two issues here.  First with how the TupleBuilder and the 
> SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting 
> the full "topic" "partition" "offset" "key" "value", but this minor.  The 
> real issue is that the code uses the same KafkaSpoutMessageId for all the 
> tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.
> https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304
> The code, however, is written to assume that it will only ever get one 
> ack/fail for a given KafkaSpoutMessageId.  This means that if one of the 
> emitted tuple trees succeed and then the other fails, the failure will not 
> result in anything being replayed!  This violates how storm is intended to 
> work.
> I discovered this as a part of STORM-2225, and I am fine with fixing it on 
> STORM-2225 (I would just remove support for that functionality because there 
> are other ways of doing this correctly).  But that would not maintain 
> backwards compatibility and I am not sure it would be appropriate for 1.x 
> releases.  I really would like to have feedback from others on this.
> I can put something into 1.x where it will throw an exception if acking is 
> enabled and this situation is present, but I don't want to spend the time 
> tying to do reference counting on the number of tuples actually emitted.  If 
> someone else wants to do that I would be happy to turn this JIRA over to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams

2017-01-15 Thread Jungtaek Lim (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823573#comment-15823573
 ] 

Jungtaek Lim commented on STORM-2228:
-

[~revans2]
I just start reading storm-kafka-client code, and seeing none of 
KafkaSpoutStreams implementations allow one topic associated to multiple 
streams, since *topicToStream* is a HashMap.

- 1.x branch: 
https://github.com/apache/storm/blob/1.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java#L110
- 1.0.x branch: 
https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java#L99

So if my understanding is correct, TOPICS\[2] is only assigned to STREAMS\[2], 
not also STREAMS\[0].
(a bit confusing though... even example code and comment are wrong.)

Given that a topic doesn't map to multiple streams, there's no chance for 
multiple tuples to have same KafkaSpoutMessageId (unless there's a problem with 
fetching and replaying), and in result, current implementation is valid.
STORM-2225 seems to be needed, but this issue seems not be valid and no longer 
a blocker.

[~hmclouro] Could you confirm this issue given that this is a blocker issue?

> KafkaSpout does not replay properly when a topic maps to multiple streams
> -
>
> Key: STORM-2228
> URL: https://issues.apache.org/jira/browse/STORM-2228
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>Priority: Blocker
>
> In the example.
> KafkaSpoutTopologyMainNamedTopics.java
> The code creates a TuplesBuilder and a KafkaSpoutStreams
> {code}
> protected KafkaSpoutTuplesBuilder getTuplesBuilder() {
> return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
> new TopicsTest0Test1TupleBuilder(TOPICS[0], 
> TOPICS[1]),
> new TopicTest2TupleBuilder(TOPICS[2]))
> .build();
> }
> protected KafkaSpoutStreams getKafkaSpoutStreams() {
> final Fields outputFields = new Fields("topic", "partition", "offset", 
> "key", "value");
> final Fields outputFields1 = new Fields("topic", "partition", "offset");
> return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
> new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent 
> to test_stream
> .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
> contents of topic test2 sent to test_stream
> .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  
> // contents of topic test2 sent to test2_stream
> .build();
> }
> {code}
> Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
> {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
> "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
> {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
> "offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})
> There are two issues here.  First with how the TupleBuilder and the 
> SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting 
> the full "topic" "partition" "offset" "key" "value", but this minor.  The 
> real issue is that the code uses the same KafkaSpoutMessageId for all the 
> tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.
> https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304
> The code, however, is written to assume that it will only ever get one 
> ack/fail for a given KafkaSpoutMessageId.  This means that if one of the 
> emitted tuple trees succeed and then the other fails, the failure will not 
> result in anything being replayed!  This violates how storm is intended to 
> work.
> I discovered this as a part of STORM-2225, and I am fine with fixing it on 
> STORM-2225 (I would just remove support for that functionality because there 
> are other ways of doing this correctly).  But that would not maintain 
> backwards compatibility and I am not sure it would be appropriate for 1.x 
> releases.  I really would like to have feedback from others on this.
> I can put something into 1.x where it will throw an exception if acking is 
> enabled and this situation is present, but I don't want to spend the time 
> tying to do reference counting on the number of tuples actually emitted.  If 
> someone else wants to do that I would be happy to turn this JIRA over to them.




[jira] [Commented] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams

2016-12-20 Thread Jungtaek Lim (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15763796#comment-15763796
 ] 

Jungtaek Lim commented on STORM-2228:
-

I'm OK to break backward compatibility even on 1.x since storm-kafka-client 
module is fairly new, and it's kind of evolving. Btw, we should've marked 
storm-kafka-client as 'evolving' opposite to 'stable' for several release 
periods.

> KafkaSpout does not replay properly when a topic maps to multiple streams
> -
>
> Key: STORM-2228
> URL: https://issues.apache.org/jira/browse/STORM-2228
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>Priority: Blocker
>
> In the example.
> KafkaSpoutTopologyMainNamedTopics.java
> The code creates a TuplesBuilder and a KafkaSpoutStreams
> {code}
> protected KafkaSpoutTuplesBuilder getTuplesBuilder() {
> return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
> new TopicsTest0Test1TupleBuilder(TOPICS[0], 
> TOPICS[1]),
> new TopicTest2TupleBuilder(TOPICS[2]))
> .build();
> }
> protected KafkaSpoutStreams getKafkaSpoutStreams() {
> final Fields outputFields = new Fields("topic", "partition", "offset", 
> "key", "value");
> final Fields outputFields1 = new Fields("topic", "partition", "offset");
> return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
> new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent 
> to test_stream
> .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
> contents of topic test2 sent to test_stream
> .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  
> // contents of topic test2 sent to test2_stream
> .build();
> }
> {code}
> Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
> {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
> "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
> {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
> "offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})
> There are two issues here.  First with how the TupleBuilder and the 
> SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting 
> the full "topic" "partition" "offset" "key" "value", but this minor.  The 
> real issue is that the code uses the same KafkaSpoutMessageId for all the 
> tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.
> https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304
> The code, however, is written to assume that it will only ever get one 
> ack/fail for a given KafkaSpoutMessageId.  This means that if one of the 
> emitted tuple trees succeed and then the other fails, the failure will not 
> result in anything being replayed!  This violates how storm is intended to 
> work.
> I discovered this as a part of STORM-2225, and I am fine with fixing it on 
> STORM-2225 (I would just remove support for that functionality because there 
> are other ways of doing this correctly).  But that would not maintain 
> backwards compatibility and I am not sure it would be appropriate for 1.x 
> releases.  I really would like to have feedback from others on this.
> I can put something into 1.x where it will throw an exception if acking is 
> enabled and this situation is present, but I don't want to spend the time 
> tying to do reference counting on the number of tuples actually emitted.  If 
> someone else wants to do that I would be happy to turn this JIRA over to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)