Worth clarifying for anyone else in this thread that a LBQ separating production from consumption is not a default thing in Storm, it's something we cooked up to prefetch elements from slow/batching resources.
Michael Rose (@Xorlev <https://twitter.com/xorlev>) Senior Platform Engineer, FullContact <http://www.fullcontact.com/> [email protected] On Fri, Jul 18, 2014 at 3:16 PM, Itai Frenkel <[email protected]> wrote: > Got it! thanks. > ------------------------------ > *From:* Michael Rose <[email protected]> > *Sent:* Saturday, July 19, 2014 12:10 AM > > *To:* [email protected] > *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?) > > 1) > > Lets say we have it set at a limit of 100 items. The LBQ currently has > 97 items in it. The SQS client runs again, pulls 10 messages, and > successfully inputs 3. The other 7 are blocked waiting for the queue to > clear out. No new HTTP requests are made to SQS while this LBQ is full > (essentially this is just called in a loop). nextTuple() eventually comes > around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are > inserted into the LBQ. At this point, we call out to the SQS client again > to fetch another 10 (and hopefully the LBQ has not completely drained by > the time the SQS client returns another 1-10 messages). > > This model isn't picky about which SQS client is grabbing messages. SQS > doesn't guarantee order (or single delivery) anyways. In one of our > topologies, we have 8 spout instances consuming the same SQS queue for > throughput purposes. Maybe I've misunderstood your question though. We > normally don't have multiple topologies consuming the same queue, but > depending on the data there's no reason we couldn't. > > Also in this model, we don't use a blocking poll method, if the LBQ is > empty, we skip emission and let Storm handle backoff if it wants to (see > below). > > 2) By backoff I mean, your spout hasn't emitted in a while, it's going > to slow down on calling nextTuple() to not busy-wait your entire CPU. If > you're at maxSpoutPending limit, nextTuple is also not called until an ack > or fail has been received (which is one of the reasons its so important to > not block in the spout as much as possible). > > Storm will use the > https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java > when you don't emit. By default, it waits only 1ms before calling again > (not exponential, just fixed), which is enough to prevent 10-100k/s polling > behavior but not significantly increase latency -- if you're ever not > emitting, you can probably afford to sleep for 1ms. We actually have it set > to 10ms, given that 99.9% of the time we'll have a message ready for > processing. > > An alternative to this is the > https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java > if you do see an impact on your throughput--but I've never needed this. > > Michael Rose (@Xorlev <https://twitter.com/xorlev>) > Senior Platform Engineer, FullContact <http://www.fullcontact.com/> > [email protected] > > > On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel <[email protected]> wrote: > >> Thanks for the thorough and quick answer. >> >> >> Two follow up questions. The context is performing low latency stream >> manipulation (hopefully process a message the moment it arrives, maybe a >> few milliseconds later). >> >> >> 1. What happens if the LBQ contains 10 items, while the Storm topology >> does not call nextTuple because of backoffs ? Wouldn't it be better of for >> another Amazon SQS client to handle these items? Or are you assuming a >> single Storm topology is the sole handler of these items ? >> >> >> 2. If by backoff, you mean storm topology cannot handle any more >> messages, or maxSpoutPending is reached, then ignore this question. If by >> backoff you mean exponential backoff then I am worried about a message >> arriving to the queue and nextTuple is not called for a long time (more >> than a few milliseconds). >> >> >> Regards, >> >> Itai >> ------------------------------ >> *From:* Michael Rose <[email protected]> >> *Sent:* Friday, July 18, 2014 11:27 PM >> >> *To:* [email protected] >> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?) >> >> I have no experience with multilang spouts, however my impression from >> the docs is that you should be handling your own multiplexing if you're >> writing a shellspout. Otherwise if you block for 5 seconds emitting a >> tuple, you cannot process an ack until that's done. I'd experiment with >> that, if you change the sleep.spout.wait time to be 500ms and you don't >> block in your spout (instead returning "sync") it should back off just as >> it does with a normal spout (see >> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java, >> "sync" is a no-op). >> >> The post you linked to was mine, and for a long time that was true >> (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do >> automatic backoffs when no tuples are emitted. The only time I've >> intentionally blocked in a spout after 0.8.0 is to control throughout (e.g. >> only allow 10/s during development). I've never built a multilang spout >> before. >> >> Spouts, like bolts, run in a single-threaded context so blocking at all >> prevents acks/fails/emits from being done until the thread is unblocked. >> That is why it's best to have another thread dealing with IO and >> asynchronously feeding a concurrent data structure the spout can utilize. >> For example, in our internal Amazon SQS client our IO thread continuously >> fetches up to 10 messages per get and shoves them into a >> LinkedBlockingQueue (until full, then it blocks the IO thread only until >> the spout emits clear up room). >> >> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >> Senior Platform Engineer, FullContact <http://www.fullcontact.com/> >> [email protected] >> >> >> On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <[email protected]> wrote: >> >>> So can you please explain this sentence from the multilang >>> documentation? >>> >>> >>> "Also like ISpout, if you have no tuples to emit for a next, you >>> should sleep for a small amount of time before syncing. ShellSpout will not >>> automatically sleep for you" >>> >>> https://storm.incubator.apache.org/documentation/Multilang-protocol.html >>> >>> >>> I read it as: "Unless you sleep a small amount of time before syncing, >>> the ShellSpout would serialize one "nextTuple" message per 1ms (see >>> configuration below) which would require much more CPU cycles" >>> >>> topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" >>> topology.sleep.spout.wait.strategy.time.ms: 1 >>> >>> You can also refer to the answer here, which refers to regular Spouts >>> doing sleep as well: >>> >>> https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0 >>> >>> >>> Regards, >>> >>> Itai >>> >>> >>> >>> ------------------------------ >>> *From:* Michael Rose <[email protected]> >>> *Sent:* Friday, July 18, 2014 10:18 PM >>> *To:* [email protected] >>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?) >>> >>> Run your producer code in another thread to fill a LBQ, poll that >>> with nextTuple instead. >>> >>> You should never be blocking yourself inside a spout. >>> >>> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/> >>> [email protected] >>> >>> >>> On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <[email protected]> wrote: >>> >>>> Hello again, >>>> >>>> >>>> Attached is a simplified reproduction (without the ShellSpout, but >>>> the concepts are the same). >>>> >>>> >>>> It seems that ack() and nextTuple() are always called on the same >>>> thread. That means that there is an inherent tradeoff. >>>> >>>> Either nextTuple sleeps a few ms (and then the ShellSpout would >>>> serialize alot of nextTuple messages) >>>> >>>> or nextTuple can sleep but then the ack is delayed. >>>> >>>> >>>> Is there a way around this limitation? >>>> >>>> >>>> Itai >>>> ------------------------------ >>>> *From:* Itai Frenkel <[email protected]> >>>> *Sent:* Thursday, July 17, 2014 9:42 PM >>>> *To:* [email protected] >>>> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?) >>>> >>>> Hello, >>>> >>>> I have noticed that an ack takes 5 seconds to pass from the bolt to >>>> the spout (see debug log below). It is a simple topology with 1 spout, 1 >>>> bolt and 1 acker all running on the same worker. The spout and the bolt are >>>> ShellSpout and ShellBolt respectively. >>>> >>>> It looks like the message is delayed in the LMAX disruptor​ queue. >>>> How can I reduce this delay to ~1ms ? >>>> >>>> Regards, >>>> Itai >>>> >>>> >>>> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to >>>> tuple 2759481868963667531 >>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack >>>> [-357211617823660063 -3928495599512172728] >>>> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to >>>> tuple 2759481868963667531 >>>> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message >>>> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 >>>> -3928495599512172728] >>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker >>>> __ack_ack [-357211617823660063] >>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message >>>> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063] >>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138 >>>> >>>> >>>> >>>> >>> >> >
