[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-17 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745845#comment-16745845
 ] 

Roshan Naik commented on STORM-2359:


Yes indeed. It will be quite nice if JC Tools queues could support peek()ing 
and still be lock free.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-17 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745154#comment-16745154
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

Thanks, responded there.

Just to give an update on where I am with this, I took another look at whether 
we can avoid all the extra tracking of anchors in the critical path. It turns 
out we can, if the JCTools queues are updated to allow another thread to peek 
at the queue contents. I have a local branch of JCTools that seems like it can 
do this, so I'll try suggesting adding this method to the JCTools guys.

If we can peek at the queue contents, we can do resets for all queued tuples 
without adding any extra logic to the critical path. The timeout resetter 
thread can just look at the queue contents, without having to involve the 
critical path code in maintaining a ConcurrentHashMap of anchors. 

Resetting timeouts for queued tuples will let users reset timeouts that have 
been delivered to a bolt's exeute() without risking queued tuples timing out 
(the method is effectively useless right now). 

As a convenience, we can add a component-level configuration that will do 
automatic timeout reset for tuples that have been delivered to execute() but 
not acked or failed, using the same technique (a ConcurrentHashMap) as I 
proposed above for the worker.

This solution would be much better than what I suggested before, because it 
means users can choose to enable the expensive tracking only for specific 
problem bolts, and only if they don't want to/can't manually call 
collector.resetTimeout. Resetting for tuples that are queued will be pretty 
cheap, because it doesn't add any slowdown to the critical path, and only 
produces reset tuples when processing is actually slow.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-17 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744817#comment-16744817
 ] 

Roshan Naik commented on STORM-2359:


[~Srdo] Fyi: Just posted the initial thoughts around Acker redesign in 
STORM-3314.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-12 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16741281#comment-16741281
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

{quote}
if a msg is in the inputQ or pendingEmitsQ, then it is in progress and it 
cannot also be ACKed
{quote}
Yes. However since the JCTools queues don't support peeking at other elements 
than the head of the queue, we need to track in progress messages separately 
somehow.

{quote}
Overall it sounds like you want to track inflight tuples at each executor... 
which if true... is a big memory overhead
{quote}
Not exactly. I want to track the anchors (i.e. some longs) of the tuples that 
are in flight in the worker. We don't need to keep track of the tuple itself. I 
basically just want to keep a count for each anchor. When a tuple becomes 
pending, we increment the count for each of its anchors. When the tuple is 
acked/failed, or removed from pendingEmits (depending on whether the tuple is 
inbound or outbound), we decrement the count for each anchor. The reset timeout 
thread can then just send resets for all anchors with a non-0 count 
periodically.

Example: 
The worker receives tuple t1 with anchors [1, 3, 6] from Netty. It increments 
the counts for key 1, 3 and 6 in the ConcurrentHashMap, and adds the tuple to 
the inbound queue for the receiving executor. When the receiving task processes 
the tuple, it acks or fails it. During ack/fail, it decrements the counts for 
the anchors, 1, 3 and 6 (the map implementation removes keys that reach 0). The 
reset thread periodically sends reset messages for each anchor in the 
ConcurrentHashMap. 

Regarding the implementation, I agree. The performance with ConcurrentHashMap 
doesn't look too awful to me, but maybe we can do better. Since we're only 
trying to keep a running count for the anchors, maybe we can push the Map code 
into another thread, and have the critical path just insert increment/decrement 
commands into a queue. I'll take a look at whether this is doable and how it 
performs.

Regarding an alternative acking design, I'd be happy if we could avoid this 
issue entirely with a new design. Please do describe your thoughts.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-11 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740143#comment-16740143
 ] 

Roshan Naik commented on STORM-2359:


Got a chance to read through your posts more carefully again. Your scheme to 
keep timeout computation in ACKer by having acker send back (A-B) to the spout 
seems sound.

I am unable to grasp the timeout reset strategy though. These 2 stmts in 
particular. 

"For the inbound queue, the anchor is no longer in progress when the associated 
tuple is acked or failed. For the outbound queue (pendingEmits in the 
Executor), the anchor is no longer in progress when the associated tuple gets 
flushed from pendingEmits."

I am thinking ... if a msg is in the inputQ or pendingEmitsQ, then it is *in 
progress* and it cannot also be ACKed. Overall it sounds like you want to track 
inflight tuples at each executor... which if true... is a big memory overhead. 

On the implementation side one key concern is that in the 2.0 perf 
re-architecture (STORM-2306) a good amount of effort was spent into tuning the 
critical path by cutting back on hashmaps and eliminating locks  both of 
which are perf killers. On a very quick scan of the code, I noticed timeout 
strategy is introducing ConcurrentHashMap into the critical path... which 
introduces both locks and hashmaps in one go. 

 

Btw... Since you have reactivated this topic on ACKing... its probably worth 
bringing up  [~kabhwan] and myself were rethinking the ACKing design 
sometime back. Since the ACKer bolt has a severe perf impact in Storm core... 
we were looking at a way to do acking without the ACKer bolt. As you can 
imagine, it would also impact the timeouts as well. I can post the general idea 
in another Jira and see what you think ... either continue down this path for 
now ... and later replace with a newer model ...or investigate a newer model 
directly.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-10 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739641#comment-16739641
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I think it makes sense to open a WIP PR for further discussion?

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2019-01-10 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739639#comment-16739639
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I've been taking a look at the feasibility of automatically resetting timeouts 
for tuples that are still being processed, and I think we can do it without 
much overhead.

The idea is to track the anchor ids of each non-system message that enters an 
executor in/out queue. For the inbound queue, the anchor is no longer in 
progress when the associated tuple is acked or failed. For the outbound queue 
(pendingEmits in the Executor), the anchor is no longer in progress when the 
associated tuple gets flushed from pendingEmits.

Occasionally a thread will check the set of in-progress anchors for the worker 
and send reset messages for all of them to the relevant ackers. In order to 
avoid sending too many messages, this thread snapshots the anchor set when it 
runs, and only sends reset messages for anchors that have been in progress 
sufficiently long in that worker.

Since there may be more than one tuple per anchor, anchors are tracked as a 
count in a multiset, rather than just presence in a set.

I've updated the spreadsheet with benchmark numbers for TVL with this 
functionality enabled. For the 90k example I also did a run where the grace 
period is disabled, to show the penalty for sending resets in the worst case, 
i.e. all in progress tuples have their timeouts reset every time the resetter 
thread runs.

The code is available at https://github.com/srdo/storm/tree/auto-reset-timeout. 
Only the latest commit is new.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-30 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16730920#comment-16730920
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

The contents of the message is a list of longs (anchor ids). The spout sends 
the list of anchors it thinks are pending to the acker. The acker removes any 
anchors from the list that are actually pending, and returns the list of 
anchors to fail. 

 

Say the spout has emitted messages with anchor ids 1, 3, 4 to acker 1 and 
message 2 to acker 2. Say acker 1 has timed out message 1, and acker 2 had 
crashed and restarted since receiving message 2. The spout will send [1,3,4] to 
acker 1, and [2] to acker 2. The ackers remove any anchor id from the received 
lists they recognize and return the modified list to the spout. So acker 1 
returns [1] and acker 2 returns [2]. When the spout receives the response 
tuple, it will fail the tuples with the received anchor ids, so 1 and 2 will 
fail, while 3 and 4 remain pending.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-29 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16730881#comment-16730881
 ] 

Roshan Naik commented on STORM-2359:


I see your point about having the timeout logic in acker. It’s a good one!. 

Sorry for delay in my response. I am traveling so unable to look into code  

Could u elaborate on the contents of the message being exhanged for Comoutung 
A-B?

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-29 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16730657#comment-16730657
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I made a few more changes, mainly to remove the pending map from the spout. It 
looks like there is no real difference in performance between timeouts in the 
acker and timeouts in the spout now. I've updated the document with benchmarks, 
please take a look.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-21 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726614#comment-16726614
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

{quote}Acker will be totally clueless if all acks from downstream bolts are 
also lost for the same tuple tree.{quote}

Yes. This is why the sync tuple is needed. If the spout executor doesn't time 
out the tree on its own, then we need a mechanism to deal with trees where the 
acker isn't aware of the tree due to message loss of the init and ack/fail 
messages. If we don't do a sync, the spout will never know that the acker isn't 
aware of the tree, and the tree will never fail.

{quote} 1) Handle timeouts in SpoutExecutor. It sends a timeout msg to ACKer. 
*+Benefit+*: May not be any better than doing in ACKer. Do you have any 
thoughts about this ?{quote}

I don't think there's any benefit to doing this over what we're doing now. 
Currently the spout executor times out the tuple, and the acker doesn't try to 
fail tuples on timeout. Instead it just quietly discards whatever information 
it has about a tree once the pending map rotates a few times. I'm not sure what 
we'd gain from the acker not rotating pending and relying on a timeout tuple 
instead. The benefit as I see it of moving the timeouts to the acker would be 
the ability to reset timeouts more frequently (e.g. on every ack) without 
increasing load on the spout executor, which we can't do if the spout is still 
handling the timeout.

{quote} 2) Eliminate timeout communication between Spout & ACK. Let each does 
its own timeout.{quote}

This is how it works now. As you note, there are some benefits to doing it this 
way. An additional benefit is that we can easily reason about the max time to 
fail a tuple on timeout, since the tuple will fail as soon as the spout rotates 
it out of pending. The drawbacks are:
 * Any time the timeout needs to be reset, a message needs to go to both the 
acker and the spout (current path is bolt -> acker -> spout)
 * Since resetting timeouts is reasonably expensive, we don't do it as part of 
regular acks, the user has to manually call collector.resetTimeout().

The benefit of moving timeouts entirely to the acker is that we can reset 
timeouts automatically on ack. This means that the tuple timeout becomes 
somewhat easier to work with, since tuples won't time out as long as an edge in 
the tree is getting acked occasionally.

This is currently more of a loose idea, but in the long run I'd like to try 
adding a feature toggle for more aggressive automatic timeout resets. I'm not 
sure what the performance penalty would be, but it would be nice if the bolts 
could periodically reset the timeout for all queued/in progress tuples (tuples 
received by the worker, but not yet acked/failed by a collector). Doing this 
would eliminate the degenerate case you mention in the design doc, where some 
bolt is taking slightly too long to process a tuple, and the queued tuples time 
out, which causes the spout to reemit, which puts the reemits in queue behind 
the already timed out tuples. In some cases this can cause the topology to 
spend all its time processing timed out tuples, preventing it from making 
progress, even though each individual tuple could have been processed within 
the timeout.

If this idea turns out to be possible to implement without adding a large 
overhead, it would add an extra drawback to timeouts in the spout:
 * We can't do automatic timeout resetting, since flooding the spout with reset 
messages is a bad idea. In particular we can't reset timeouts for messages that 
are sitting in bolt queues. The user can't reset these timeouts manually either.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-20 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726414#comment-16726414
 ] 

Roshan Naik commented on STORM-2359:


{quote}If we were to move timeouts entirely to the acker, there would be a 
potential for "lost" tuples (ones that end up not being reemitted) if messages 
between the acker and spout get lost.
 * The spout may emit a new tuple, and try to notify the acker about it. If 
this message is lost, the acker doesn't know about the tuple, and thus can't 
time it out.
{quote}
Acker will be totally clueless if all acks from downstream bolts are also lost 
for the same tuple tree.

 
{quote}We can move the timeout logic to the acker,
{quote}
Alternatives :
1) Handle timeouts in SpoutExecutor. It sends a timeout msg to ACKer. 
*+Benefit+*: May not be any better than doing in ACKer. Do you have any 
thoughts about this ?

2)  Eliminate timeout communication between Spout & ACK. Let each does its own 
timeout. *+Benefit+*: Eliminates need for:
 * Periodic sync up msgs (also avoid possibility of latency spikes in multi 
worker mode when these msgs get large).
 * A - B calculation, as well as
 * timeout msg exchanges.

TimeoutReset msgs can still be supported.

 -roshan

 

 

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-17 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723067#comment-16723067
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

There's a branch implementing this at 
[https://github.com/srdo/storm/tree/STORM-2359-experimentation.] I did a couple 
of runs of the ConstSpoutNullBoltTopology from storm-perf, as well as the 
ThroughputVsLatency topology.

I'm seeing an overhead of about 10% for the ConstSpoutNullBoltTopology, since 
the spout and ackers are the only components involved. For the 
ThroughputVsLatency topology, the overhead looks to be negligible. I've 
attached the raw numbers and some charts.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-17 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723070#comment-16723070
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I will check what impact it has to reinsert tuples in the acker's pending when 
acks are received.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Stig Rohde Døssing
>Priority: Major
> Attachments: STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2018-12-17 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723055#comment-16723055
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I've been taking a look at this, and have a proposal for how we could move the 
timeout entirely to the acker.

I'm going to assume in the following that tuples sent from one task to another 
are received in the order they were sent, ignoring message loss. I think this 
is the case, but please correct me if it isn't.
h6. The problem

Both the acker and spout executor currently have rotating pending maps, which 
time out tuples based on received ticks. The acker pending map just discards 
the tuple, while the spout pending map fails them if they get rotated out. The 
reason this currently happens in both acker and spout is that we need to ensure 
that the spout fails tuples if they time out, even in the presence of message 
loss.

If we were to move timeouts entirely to the acker, there would be a potential 
for "lost" tuples (ones that end up not being reemitted) if messages between 
the acker and spout get lost.
 * The spout may emit a new tuple, and try to notify the acker about it. If 
this message is lost, the acker doesn't know about the tuple, and thus can't 
time it out.
 * The acker may send an ack or fail message back to the spout, which may get 
lost. Since the acker currently deletes acked/failed messages as soon as they 
are acked/failed, this would prevent the message from being replayed.

h6. Suggested solution

We can move the timeout logic to the acker, and it will work out of the box as 
long as there are no lost messages. I think we can account for lost messages by 
having the spout executor periodically update its view of pending tuples based 
on the state in the acker.

Say the spout pending root ids are A, and the acker pending root ids are B. The 
spout can periodically (e.g. once per second) send to each acker the root ids 
in A. The acker should respond to this tuple by sending back A - B (it can 
optionally also delete anything in B that is not in A). The spout can safely 
fail any tuple in A - B which is also in the spout pending when the sync tuple 
response is received.
 * If a tuple is in A and B, it is still pending, and shouldn't be removed. 
Returning only A - B ensures pending tuples remain.
 * If a tuple is in A but not B, the ack_init message was lost, the acker may 
have crashed and restarted or the tuple has simply been acked or failed.
 ** If the ack_init was lost, or the acker crashed, then the spout should 
replay the message. Since A - B contains the tuple, the spout will fail it when 
it receives the response.
 ** If the tuple was acked, then the acker is guaranteed to have sent the ack 
message on to the spout before handling the sync tuple. Due to message 
ordering, the ack will be processed before the sync tuple response, making the 
presence of the tuple in A - B irrelevant.
 * If a tuple is not in A but in B, then the spout may have crashed. The acker 
can optionally just discard the pending tuple without notifying the spout, 
since notifying the spout about the state of a tuple emitted by a different 
instance is a no op.

h6. Benefit

Moving the timeout logic to the acker makes it much cheaper to reset tuple 
timeouts, since the spout no longer needs to be notified directly. We could 
likely make the acker reset the timeout automatically any time it receives an 
ack.

 

Depending on overhead, we might be able to add an option to let Storm reset 
timeouts for tuples that are still being actively processed (i.e. received by a 
bolt but not yet acked/failed). This would be beneficial to avoid the event 
tsunami problem described in the linked design doc. It could help prevent the 
type of degenerate case described at 
https://issues.apache.org/jira/browse/STORM-2359?focusedCommentId=16043409=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16043409.
h6. Cost

There is a bit of extra overhead to doing this. The spout needs to keep track 
of which acker tasks are responsible for which root ids. There is also the 
(fairly minor) overhead of sending the sync tuples back and forth. In terms of 
processing reliability, a tuple the acker considers failed/timed out will be 
failed in the spout once one of the sync tuples make it through.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Priority: Major
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by 

[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-06-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16045442#comment-16045442
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I think the reason it would be tough to move timeouts entirely to the ackers is 
that we'd need to figure out how to deal with message loss between the spout 
and acker when the acker sends a timeout message to the spout. The current 
implementation errs on the side of caution by always reemitting if it can't 
positively say that a tuple has been acked. I'm not sure how we could do the 
same when the acker has to notify the spout to reemit after the timeout, 
because that message could be lost.

It might be a good idea as you mention to instead have two timeouts, a short 
one for the acker and a much longer one for the spout. It would probably mean 
that messages where the acker init message is lost will take much longer to 
retry than messages that are lost elsewhere, but it might allow us to keep 
timeout resets out of the spout.

Tuples can be lost if a worker died, but what if there's a network issue? Can't 
messages also be lost then? 

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-06-09 Thread Roshan Naik (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16045053#comment-16045053
 ] 

Roshan Naik commented on STORM-2359:


{quote} The current implementation has a pending map in both the acker and the 
spout, which rotate every topology.message.timeout.secs.{quote}
Need to see if we can eliminate the timeout logic from the spout and have it 
only the ACKer (i can think of some issues). If we must retain that logic in 
the spouts, the timeout value that it operates on (full tuple tree processing) 
would have to be separated from the timeout value that the ACKER uses to track 
progress between stages. 

{quote}The spout then reemitted the expired tuples, and they got into the queue 
behind their own expired instances. {quote}

Perfect example indeed. The motivation of this jira is to try to 
eliminate/mitigate triggering of timeouts for queued/inflight tuples that are 
not lost. The only time we need timeouts/remits to be triggered is when 
one/more tuples in the tuple tree are truly lost. *I think* that can only 
happen if a worker/bolt/spout died. So the case your describing should not 
happen if we solve this problem correctly.
 
IMO, the ideal solution would have the spouts remit only the specific tuples 
whose tuple trees had some loss due to a worker going down. I am not yet 
certain whether/not this initial idea described in the doc is the optimal 
solution. Perhaps a better way is to trigger such re-emits only if a 
worker/bolt/spout went down.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-06-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043409#comment-16043409
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

{quote}
The resets are managed internally by the ACKer bolt. The spout only gets 
notified if the timeout expires or if tuple-tree is fully processed. 
{quote}
How will this work? The current implementation has a pending map in both the 
acker and the spout, which rotate every topology.message.timeout.secs. If the 
acker doesn't forward reset requests to the spout, the spout will just expire 
the tuple tree on its own when the message timeout has passed.

{quote}
That case would be more accurately classified as "progress is being made" ... 
but slower than expected.
The case of 'progress is not being made' is when a worker that is processing 
part of the tuple tree dies.
{quote}
Yes, you are right. But it is currently possible that the topology may degrade 
to no progress being made even if each individual tuple could be processed 
under the message timeout, because tuples can expire while queued and get 
reemitted, where they can then be delayed by their own duplicates which are 
ahead in the queue. For IRichBolts, this can be mitigated by the bolt being 
written to accept and queue tuples internally, where the bolt can then reset 
their timeouts manually if necessary, but for IBasicBolt this is not possible.

Just to give a concrete example, we had an IBasicBolt enrich tuples with some 
database data. Most tuples were processed very quickly, but a few were slow. 
Even the slow tuples never took longer than our message timeout individually. 
We then had an instance where a bunch of slow tuples happened to come in on the 
stream close to each other. The first few were processed before they expired, 
but the rest expired while queued. The spout then reemitted the expired tuples, 
and they got into the queue behind their own expired instances. Since the bolt 
won't skip expired tuples, the freshly emitted tuples also expired, which 
caused another reemit. This repeated until the topology was restarted so the 
queues could be cleared.

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-06-07 Thread Roshan Naik (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16041851#comment-16041851
 ] 

Roshan Naik commented on STORM-2359:


{quote} In order to reduce load on the spout, we should try to "bundle up" 
these resets in the acker bolts before sending them to the spout {quote}
The resets are managed internally by the ACKer bolt. The spout only gets 
notified if the timeout expires or if tuple-tree is fully processed. 

{quote}When tuples are stuck in queues behind other tuples{quote}
That case would be more accurately classified as "progress is being made" ... 
but slower than expected.
The case of 'progress is not being made' is when a worker that is processing 
part of the tuple tree dies.

{quote}If a bolt is taking longer to process a tuple than expected, it can be 
solved in the concrete bolt implementation by using 
OutputCollector.resetTimeout at an appropriate interval (e.g. the tuple timeout 
minus a few seconds).{quote}
I think you are trying to mitigate the number of resets being sent to Spout... 
but like i mentioned before reset are never sent to spout.



> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-05-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027906#comment-16027906
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I thought a bit more about this.

Here are the situations I could think of where tuples are currently being 
expired without being lost:
* The tuple tree is still making progress, and tuples are getting acked, but 
the time to process the entire tree is larger than the tuple timeout. This is 
very likely to happen if there's congestion somewhere in the topology.
* The tuple tree is not making progress, because a bolt is currently processing 
a tuple in the tree, and processing is taking longer than expected.
* The tuple tree is not making progress, because the tuple(s) are stuck in 
queues behind other slow tuples. This is also very likely to happen if there's 
congestion in the topology.

The situation where there's still progress being made can be solved by 
resetting the tuple timeout whenever an ack is received. In order to reduce 
load on the spout, we should try to "bundle up" these resets in the acker bolts 
before sending them to the spout. I think a decent way to do this bundling is 
to make the acker bolt keep track of which tuples they've received acks from 
since the last time timeouts were reset. When a configured interval expires, 
the bolt empties out the list of live tuples, and sends timeout resets for all 
of them to the spout. The interval should probably be specified as a percentage 
of the tuple timeout.

If a bolt is taking longer to process a tuple than expected, it can be solved 
in the concrete bolt implementation by using OutputCollector.resetTimeout at an 
appropriate interval (e.g. the tuple timeout minus a few seconds).

When tuples are stuck in queues behind other tuples, the topology can have a 
hard time recovering. This is because the expiration timer starts ticking for a 
tuple as soon as it's emitted, so if the bolt queues are congested, the bolts 
may be spending all their time processing tuples that belong to expired tuple 
trees. In order to solve this, we need to reset timeouts for queued tuples from 
time to time. It should be possible to add a thread that peeks at the available 
messages in the DisruptorQueue with some interval, and resets the timeout for 
any messages that were also queued last time the thread was run. Only sending 
the resets once a tuple has been queued for the entire interval should help 
decrease the number of unnecessary resets sent to the spout. We should be able 
to reuse the interval configuration also added to the acker bolt. 

I'd welcome any feedback on these ideas :)

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-03-17 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930644#comment-15930644
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

This is a great idea, we've also seen topologies exhibit bad performance when 
under pressure because a lot of the queued tuples are already considered timed 
out by the spout.

It seems like part of the reason the tuple timeout is currently a set value 
that is not automatically reset is to avoid flooding the spout instances with 
ack messages. The ackers can be scaled out to handle a large number of acks, 
and can then notify the spout of acks only once per tuple tree. I'm assuming we 
want to keep the management of tuple tree ack/fail inside the spout( ? ), so 
have you considered a way to deduplicate or "bundle up" the ack stream in the 
acker before notifying the spout that the timeout should be reset? Maybe only 
notify once a percentage of the timeout has elapsed?

The bolt heartbeating may be able to reuse some code from 
https://issues.apache.org/jira/browse/STORM-1549

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)