Worth clarifying for anyone else in this thread that a LBQ separating
production from consumption is not a default thing in Storm, it's something
we cooked up to prefetch elements from slow/batching resources.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
[email protected]


On Fri, Jul 18, 2014 at 3:16 PM, Itai Frenkel <[email protected]> wrote:

>  Got it! thanks.
>  ------------------------------
> *From:* Michael Rose <[email protected]>
> *Sent:* Saturday, July 19, 2014 12:10 AM
>
> *To:* [email protected]
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  1)
>
>  Lets say we have it set at a limit of 100 items. The LBQ currently has
> 97 items in it. The SQS client runs again, pulls 10 messages, and
> successfully inputs 3. The other 7 are blocked waiting for the queue to
> clear out. No new HTTP requests are made to SQS while this LBQ is full
> (essentially this is just called in a loop). nextTuple() eventually comes
> around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are
> inserted into the LBQ. At this point, we call out to the SQS client again
> to fetch another 10 (and hopefully the LBQ has not completely drained by
> the time the SQS client returns another 1-10 messages).
>
>  This model isn't picky about which SQS client is grabbing messages. SQS
> doesn't guarantee order (or single delivery) anyways. In one of our
> topologies, we have 8 spout instances consuming the same SQS queue for
> throughput purposes. Maybe I've misunderstood your question though. We
> normally don't have multiple topologies consuming the same queue, but
> depending on the data there's no reason we couldn't.
>
>  Also in this model, we don't use a blocking poll method, if the LBQ is
> empty, we skip emission and let Storm handle backoff if it wants to (see
> below).
>
>  2) By backoff I mean, your spout hasn't emitted in a while, it's going
> to slow down on calling nextTuple() to not busy-wait your entire CPU. If
> you're at maxSpoutPending limit, nextTuple is also not called until an ack
> or fail has been received (which is one of the reasons its so important to
> not block in the spout as much as possible).
>
>  Storm will use the
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
> when you don't emit. By default, it waits only 1ms before calling again
> (not exponential, just fixed), which is enough to prevent 10-100k/s polling
> behavior but not significantly increase latency -- if you're ever not
> emitting, you can probably afford to sleep for 1ms. We actually have it set
> to 10ms, given that 99.9% of the time we'll have a message ready for
> processing.
>
>  An alternative to this is the
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java
> if you do see an impact on your throughput--but I've never needed this.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> [email protected]
>
>
> On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel <[email protected]> wrote:
>
>>  Thanks for the thorough and quick answer.
>>
>>
>>  Two follow up questions. The context is performing low latency stream
>> manipulation (hopefully process a message the moment it arrives, maybe a
>> few milliseconds later).
>>
>>
>>  1. What happens if the LBQ contains 10 items, while the Storm topology
>> does not call nextTuple because of backoffs ? Wouldn't it be better of for
>> another Amazon SQS client to handle these items? Or are you assuming a
>> single Storm topology is the sole handler of these items ?
>>
>>
>>  2. If by backoff, you mean storm topology cannot handle any more
>> messages, or maxSpoutPending is reached, then ignore this question. If by
>> backoff you mean exponential backoff then I am worried about a message
>> arriving to the queue and nextTuple is not called for a long time (more
>> than a few milliseconds).
>>
>>
>>  Regards,
>>
>> Itai
>>  ------------------------------
>> *From:* Michael Rose <[email protected]>
>> *Sent:* Friday, July 18, 2014 11:27 PM
>>
>> *To:* [email protected]
>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>>
>>   I have no experience with multilang spouts, however my impression from
>> the docs is that you should be handling your own multiplexing if you're
>> writing a shellspout. Otherwise if you block for 5 seconds emitting a
>> tuple, you cannot process an ack until that's done. I'd experiment with
>> that, if you change the sleep.spout.wait time to be 500ms and you don't
>> block in your spout (instead returning "sync") it should back off just as
>> it does with a normal spout (see
>> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java,
>> "sync" is a no-op).
>>
>> The post you linked to was mine, and for a long time that was true
>> (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
>> automatic backoffs when no tuples are emitted. The only time I've
>> intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
>> only allow 10/s during development). I've never built a multilang spout
>> before.
>>
>>  Spouts, like bolts, run in a single-threaded context so blocking at all
>> prevents acks/fails/emits from being done until the thread is unblocked.
>> That is why it's best to have another thread dealing with IO and
>> asynchronously feeding a concurrent data structure the spout can utilize.
>> For example, in our internal Amazon SQS client our IO thread continuously
>> fetches up to 10 messages per get and shoves them into a
>> LinkedBlockingQueue (until full, then it blocks the IO thread only until
>> the spout emits clear up room).
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> [email protected]
>>
>>
>> On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <[email protected]> wrote:
>>
>>>  So can you please explain this sentence from the multilang
>>> documentation?
>>>
>>>
>>>  "Also like ISpout, if you have no tuples to emit for a next, you
>>> should sleep for a small amount of time before syncing. ShellSpout will not
>>> automatically sleep for you"
>>>
>>> https://storm.incubator.apache.org/documentation/Multilang-protocol.html
>>>
>>>
>>>  I read it as: "Unless you sleep a small amount of time before syncing,
>>> the ShellSpout would serialize one "nextTuple" message per 1ms (see
>>> configuration below) which would require much more CPU cycles"
>>>
>>> topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
>>> topology.sleep.spout.wait.strategy.time.ms: 1
>>>
>>>  You can also refer to the answer here, which refers to regular Spouts
>>> doing sleep as well:
>>>
>>> https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0
>>>
>>>
>>>  Regards,
>>>
>>> Itai
>>>
>>>
>>>
>>>  ------------------------------
>>> *From:* Michael Rose <[email protected]>
>>> *Sent:* Friday, July 18, 2014 10:18 PM
>>> *To:* [email protected]
>>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>>>
>>>   Run your producer code in another thread to fill a LBQ, poll that
>>> with nextTuple instead.
>>>
>>>  You should never be blocking yourself inside a spout.
>>>
>>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>> [email protected]
>>>
>>>
>>> On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <[email protected]> wrote:
>>>
>>>>  Hello again,
>>>>
>>>>
>>>>  Attached is a simplified reproduction (without the ShellSpout, but
>>>> the concepts are the same).
>>>>
>>>>
>>>>  It seems that ack() and nextTuple() are always called on the same
>>>> thread. That means that there is an inherent tradeoff.
>>>>
>>>> Either nextTuple sleeps a few ms  (and then the ShellSpout would
>>>> serialize alot of nextTuple messages)
>>>>
>>>> or nextTuple can sleep but then the ack is delayed.
>>>>
>>>>
>>>>  Is there a way around this limitation?
>>>>
>>>>
>>>>  Itai
>>>>  ------------------------------
>>>> *From:* Itai Frenkel <[email protected]>
>>>> *Sent:* Thursday, July 17, 2014 9:42 PM
>>>> *To:* [email protected]
>>>> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>>>>
>>>>    Hello,
>>>>
>>>>  I have noticed that an ack takes 5 seconds to pass from the bolt to
>>>> the spout (see debug log below). It is a simple topology with 1 spout, 1
>>>> bolt and 1 acker all running on the same worker. The spout and the bolt are
>>>> ShellSpout and ShellBolt respectively.
>>>>
>>>>  It looks like the message is delayed in the LMAX disruptor​ queue.
>>>>  How can I reduce this delay to ~1ms ?
>>>>
>>>>  Regards,
>>>>  Itai
>>>>
>>>>
>>>>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
>>>> tuple 2759481868963667531
>>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
>>>> [-357211617823660063 -3928495599512172728]
>>>> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
>>>> tuple 2759481868963667531
>>>> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
>>>> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
>>>> -3928495599512172728]
>>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
>>>> __ack_ack [-357211617823660063]
>>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
>>>> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
>>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to