[jira] [Commented] (STORM-3314) Acker redesign

2019-01-20 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-3314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16747458#comment-16747458
 ] 

Roshan Naik commented on STORM-3314:


 

*Summary of the above two approaches:*

+First approach+ tries to eliminate ACKer bolt by employing the existing spouts 
and bolts. It doesnt try to mitigate interworker ACK msgs. Also ACK msgs can 
get stuck behind regular msgs under BP.

+Second approach+ is centered around mitigating interworker ACK msging. Retains 
need to tweak Acker count separately to handle more/lesser spout and bolts... 
most likely a fixed number of them in each worker.

Both eliminate need for fields grouping of ACKs. Both need more thought on 
timeout resets.


*Some data points:*
 * *Multiworker Speed:* In Storm 2, two components on +different workers+ can 
sustain a max throughput of ~*4 mill/sec* over the interworker msg-ing (w/o 
ACKing).
 * *ACKer bolt* speed (Single worker mode):(
 ** An ACKer bolt can sustain a max topo throughput of only ~*1.7 mill/sec* in 
*single worker* mode... when handling an ultra bare minimal topo of 1 spout and 
1 bolt.
 ** ACKer slows down to *950k/sec* mill/sec with adding one more bolt in the 
mix (i.e. ConstSpoutIdBoltNullBoltTopo)
 * *ACKing + Multi worker:*  For 2 workers (1 spout & 1 bolt) ACK mode 
throughput is at ~*900k/s.* 

Ideally, ACKing path within single worker should be faster than interworker 
msging, given the absence of de/serialization and network stack. Clearly both 
the ACKer bolt itself and the interworker msging are big factors... with the 
ACKer bolt being the more significant one. Mitigating only interworker msging, 
will likely leave a lot of potential performance on the table.

Perhaps there is a hybrid approach that combine benefits of both approaches... 
reduce interworker and eliminate ACKer bolt.


*Qs about [~kabhwan]'s approach:*
 # If I understand correctly, in single worker mode there will be no 
difference. And in multiworker mode, when the entire tuple tree falls within 
the same worker, as well as in single worker mode, the perf will be same ?
 # What is the method to compute "fully processed" after tuple tree is split 
into per worker fractions ? Sounds like ACKers will msg each other about 
completion of their partial tree ... and this will boil up to the root ACKER. 

 

*Additional Thoughts:*

There appears to be two properties (which I think are true) that remain 
unexploited in the two approaches and a potential hybrid approach:

- Within the worker process, the msging subsystem guarantees *delivery* and 
there can be no msg loss.
- Message loss happens only when a worker goes down (or gets disconnected from 
the rest of the topo)

 

 

> Acker redesign
> --
>
> Key: STORM-3314
> URL: https://issues.apache.org/jira/browse/STORM-3314
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-client
>Reporter: Roshan Naik
>Priority: Major
>
> *Context:* The ACKing mechanism has come focus as one of the next major 
> bottlenecks to address. The strategy to timeout and replay tuples has issues 
> discussed in STORM-2359
> *Basic idea:* Every bolt will send an ACK msg to its upstream spout/bolt once 
> the tuples it emitted have been *fully processed* by downstream bolts.
> *Determining "fully processed”* : For every incoming (parent) tuple, a bolt 
> can emit 0 or more “child” tuples. the Parent tuple is considered fully 
> processed once a bolt receives ACKs for all the child emits (if any). This 
> basic idea cascades all the way back up to the spout that emitted the root of 
> the tuple tree.
> This means that, when a bolt is finished with all the child emits and it 
> calls ack() no ACK message will be generated (unless there were 0 child 
> emits). The ack() marks the completion of all child emits for a parent tuple. 
> The bolt will emit an ACK to its upstream component once all the ACKs from 
> downstream components have been received.
> *Operational changes:* The existing spouts and bolts don’t need any change. 
> The bolt executor will need to process incoming acks from downstream bolts 
> and send an ACK to its upstream component as needed. In the case of 0 child 
> emits, ack() itself could immediately send the ACK to the upstream component. 
> Field grouping is not applied to ACK messages.
> Total ACK messages: The spout output collector will no longer send an 
> ACK-init message to the ACKer bolt. Other than this, the total number of 
> emitted ACK messages does not change. Instead of the ACKs going to an ACKer 
> bolt, they get spread out among the existing bolts. It appears that this mode 
> may reduce some of the inter-worker traffic of ACK messages.
> *Memory use:* If we use the existing XOR logic from ACKer bolt, we need about 
> 20 bytes per outstanding tuple-tree at each bolt. Assuming an 

[jira] [Commented] (STORM-3314) Acker redesign

2019-01-17 Thread JIRA


[ 
https://issues.apache.org/jira/browse/STORM-3314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745145#comment-16745145
 ] 

Stig Rohde Døssing commented on STORM-3314:
---

The core idea of avoiding crossing the worker boundary as much as possible 
sounds really good.

I think I like Jungtaek's iteration a little better.

I see the following benefits to the second idea over the first:
* Using a separate acker task in each worker means we don't have to worry about 
acks getting queued behind user tuples
* The same worry would apply to timeout resets, and would almost certainly mean 
that we can't do resetting at all reliably.
* As Jungtaek mentioned, backpressure would be a concern for acks when they're 
going through the same queues as user tuples
* We keep the ability to add more ackers in each worker if we want. 

My initial suggestion for how timeouts would work with this setup would be that 
we move timeout logic entirely to the acker. Since the spout and initial acker 
are now guaranteed to run in the same worker, we don't need the spout to be 
timing out tuples by itself, and we don't need to worry about the first 
ACK_INIT getting lost. 

When doing a timeout reset, if we have every acker keep its own timeout, we 
will need to walk up the tree from the worker that resets the timeout to the 
root acker. I think we might be able to reduce the number of reset messages 
here by having only the root acker time out tuples. We can use the sync tuple 
mechanism I suggested in STORM-2359 to make sure downstream ackers don't keep 
outdated anchors around. This way, we can jump straight to the root, instead of 
having to walk the tree.

I'll have to think a bit more about automatic timeout resetting. Currently it 
can be made to work nicely because every ack resets the timeout, which makes it 
easy to reason about how long we can safely delay resetting tuples. This lets 
us decide not to reset tuples that have only been present in a worker for a 
short time, because we know that the tree was likely acked recently, right 
before it was sent to the worker. If acks don't end up hitting the root acker, 
we might need to send some extra tuples to ensure timeouts are reset. Maybe we 
can get away with buffering up acked anchors for a percentage of the message 
timeout in the acker, and send them as a batch message to the root acker. I'm a 
little worried about flooding the root acker with reset messages.

Regarding FAIL tuples, if we decide to use sync tuples to keep the root acker 
and other ackers in sync, we might be able to just send FAILs up the tree to 
the root acker, and let the sync tuple be responsible for removing it from any 
branches of the tuple path that didn't get the FAIL. I think trying to ensure 
all ackers are notified about the FAIL immediately would be hard.

> Acker redesign
> --
>
> Key: STORM-3314
> URL: https://issues.apache.org/jira/browse/STORM-3314
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-client
>Reporter: Roshan Naik
>Priority: Major
>
> *Context:* The ACKing mechanism has come focus as one of the next major 
> bottlenecks to address. The strategy to timeout and replay tuples has issues 
> discussed in STORM-2359
> *Basic idea:* Every bolt will send an ACK msg to its upstream spout/bolt once 
> the tuples it emitted have been *fully processed* by downstream bolts.
> *Determining "fully processed”* : For every incoming (parent) tuple, a bolt 
> can emit 0 or more “child” tuples. the Parent tuple is considered fully 
> processed once a bolt receives ACKs for all the child emits (if any). This 
> basic idea cascades all the way back up to the spout that emitted the root of 
> the tuple tree.
> This means that, when a bolt is finished with all the child emits and it 
> calls ack() no ACK message will be generated (unless there were 0 child 
> emits). The ack() marks the completion of all child emits for a parent tuple. 
> The bolt will emit an ACK to its upstream component once all the ACKs from 
> downstream components have been received.
> *Operational changes:* The existing spouts and bolts don’t need any change. 
> The bolt executor will need to process incoming acks from downstream bolts 
> and send an ACK to its upstream component as needed. In the case of 0 child 
> emits, ack() itself could immediately send the ACK to the upstream component. 
> Field grouping is not applied to ACK messages.
> Total ACK messages: The spout output collector will no longer send an 
> ACK-init message to the ACKer bolt. Other than this, the total number of 
> emitted ACK messages does not change. Instead of the ACKs going to an ACKer 
> bolt, they get spread out among the existing bolts. It appears that this mode 
> may reduce some of the inter-worker traffic of ACK messages.
> *Memory use:* If we use 

[jira] [Commented] (STORM-3314) Acker redesign

2019-01-17 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-3314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744813#comment-16744813
 ] 

Roshan Naik commented on STORM-3314:


_Quoting thoughts expressed by_ [~kabhwan] _in response to above initial idea :_

 

*Essential idea:* Try to avoid sending ack tuple to another workers, given that 
it incurs serde and network cost. If we could get rid of field grouping it may 
be better.

Roshan brought a great idea about it, which is in line with my essential idea, 
so there may be a trade-off between Roshan’s and mine. My idea is inspired by 
Roshan’s idea and a bit modified to adopt nice things of his idea.

 

*Summary:* Build partial tuple tree into Acker in each worker, and let Acker in 
worker handles its own partial tuple tree. Note that we don’t change the basic 
idea of acking mechanism.

 

*Detail:*
 * When tuple is emitted from spout, it sends ACK_INIT message to Acker within 
worker.
 ** Same as current.

 
 * The logic for emitting tuple in Bolt doesn’t change.
 ** We may want to include acker ID to the tuple to send acknowledge to.
 ** Otherwise, we also could compute (and cache) it from bolt side (based on 
source task ID) to not grow size of each tuple.
 * When worker receives tuples from outside of worker, it also sends ACK_INIT 
message to Acker within worker.
 ** Acker stores additional information (acker task ID for source task’s 
worker, tuple ID of source tuple, anchors, etc.) to “send back ACK to source” 
when partial tuple tree is completed.
 ** The information may be a bit different between referring spout and 
referring another acker.
 * Whenever tuple is acknowledged in Bolt, ACK message is sent to the acker 
within worker.
 * When acker found tuple tree is completed, it sends back ACK to source acker 
based on stored information.
 ** If the source is Spout, just do as same as what we are going.

 

*Expected benefits:*
 * ACK tuples between hops within worker will not be sent to another workers, 
hence avoiding cost of serde and network transfer.
 * No field grouping.
 * It follows the path how tuples are flowing, which could be optimized by 
grouper implementation.

 

*Considerations:*
 * Each worker has to have acker task for each, which is a new requirement even 
it is by default for now.
 ** We could (and should) still turn off acker when needed, but configuration 
for acker needs to be changed to turn on/off, not acker task count.
 * Total count of ack tuples is not changed.
 ** Maybe we could avoid sending ACK tuples within worker and just calculate 
and update what acker is doing on the fly (in executor thread) if it gives more 
benefits.
 * A bit more memory is needed given that we maintain tuple tree for each 
worker (linear to worker count tuple tree has to flow), and we need additional 
information for sending back ACK to another acker.
 * Need to consider how FAIL tuple will work, and how RESET_TIMEOUT tuple will 
work.
 ** They will be going to be headache things to consider from both mine and 
Roshan’s.
 ** I’m still not sure how tuple timeout works with backpressure, even we still 
need this to get rid of old things in rotating tree.

 

*Differences between Roshan’s idea:*
 * We don’t add ACK tuples to the source task’s queue, hence keep bolt’s task 
and acker’s task separate.
 ** Two perspectives: queue, (executor) thread.
 ** It will be less coupled with backpressure. IMHO, ACK tuples and metrics 
tuples should flow regardless of backpressure ongoing.
 ** I think leaving it to separate could keep opening possibilities to apply 
more optimizations on acker, like having separate channel for ACK tuples and 
metrics tuples.
 * It uses less memory if there’re some tasks in worker: it is just what acker 
excels.
 * Less ACK tuple flowing hops when ACK tuples are flowing back, given that it 
only deals with ackers. (Yes, more hops when acking within worker though.)
 * We just need to handle rotating tree only one place (acker) for each worker, 
compared to each task.
 * Complicated than Roshan’s idea: Roshan’s idea is just clearly intuitive, and 
I can’t imagine easier solutions.

 

 

> Acker redesign
> --
>
> Key: STORM-3314
> URL: https://issues.apache.org/jira/browse/STORM-3314
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-client
>Reporter: Roshan Naik
>Priority: Major
>
> *Context:* The ACKing mechanism has come focus as one of the next major 
> bottlenecks to address. The strategy to timeout and replay tuples has issues 
> discussed in STORM-2359
> *Basic idea:* Every bolt will send an ACK msg to its upstream spout/bolt once 
> the tuples it emitted have been *fully processed* by downstream bolts.
> *Determining "fully processed”* : For every incoming (parent) tuple, a bolt 
> can emit 0 or more “child” tuples. the Parent tuple is