Hey Nick,

WOW!  Thanks a bunch for responding so quickly and with such an complete
and awesome analysis of the issue!  Very much appreciate your thoughts.

My definition of throughput is number of tuples acked/minute through the
entire topology as reported in the Topology stats section of the Storm UI.

Just to ensure I've described the situation correctly, I am binning in Bolt
A, building up a bin container consisting of (1) a List of tuples generated
in Bolt A and (2) a list of corresponding anchor tuples that were emitted
by the KafkaSpout to Bolt A.  Once each bin container builds up to a
configurable count limit of generated tuples, Bolt A emits via the
OutputCollector *emit
<http://storm.apache.org/apidocs/backtype/storm/task/OutputCollector.html#emit(java.util.Collection,%20java.util.List)>*
(Collection
<http://docs.oracle.com/javase/6/docs/api/java/util/Collection.html?is-external=true>
<Tuple <http://storm.apache.org/apidocs/backtype/storm/tuple/Tuple.html>
> anchors, List
<http://docs.oracle.com/javase/6/docs/api/java/util/List.html?is-external=true>
<Object
<http://docs.oracle.com/javase/6/docs/api/java/lang/Object.html?is-external=true>
> tuple) method the bin container as the single outgoing tuple (second
method argument (tuple), therefore a List consisting of one container cast
as a List[Object]) and a List of anchor tuples (first method argument
(anchors)). Again, there is one outgoing tuple anchored by a List of
incoming tuples.

The ~90% of time spent in the LMAX messaging layer is in Bolt B. One point
that I left out is that the execute latency of Bolt A is between 2-3 ms,
whereas Bolt B is 60-65 ms. That's why I am thinking that the acking of the
anchor tuples in Bolt B could be slowing things down as the tuple does not
exit the Bolt B execute method until ack is called, and the execute latency
indicates the time spent in Bolt B--and therefore in the LMAX messaging
layer--is relatively high.

I agree that it makes sense to try an experiment where I don't anchor each
bin container tuple with the corresponding input tuples to see if that
decreases execute latency in Bolt B as well as the time spent in c
om.lmax.disruptor.BlockingWaitStrategy. If this makes a significant impact,
then I may change things up so that I cache the bin containers until the
number of anchor tuples reaches a threshold and then emit.

The key thing is that I want to anchor each bin container tuple because I
want to ensure at least once processing of each tuple entering the topology
from the KafkaSpout.

Thanks again for your thoughts and taking time to provide a great analysis
of what I am working with. I am going to read and respond to Kashyap's
inputs.

--John


On Sat, Jan 30, 2016 at 12:50 PM, Nick R. Katsipoulakis <
[email protected]> wrote:

> Hello John,
>
> First off, let us agree on your definition of throughput. Do you define
> throughput as the average number of tuples each of your last bolts (sinks)
> emit per second? If yes, then OK. Otherwise, please provide us with more
> details.
>
> Going back to the BlockingWaitStrategy observation you have, it (most
> probably) means that since you are producing a large number of tuples
> (15-20 tuples) the outgoing Disruptor queue gets full, and the emit()
> function blocks. Also, since you are anchoring tuples (that might mean
> exactly-once semantics), it basically takes more time to place something in
> the queue, in order to guarantee deliver of all tuples to a downstream
> bolt.
>
> Therefore, it makes sense to see so much time spent in the LMAX messaging
> layer. A good experiment to verify your hypothesis, is to not anchor
> tuples, and profile your topology again. However, I am not sure that you
> will see a much different percentage, since for every tuple you are
> receiving, you have at least one call to the Disruptor layer. Maybe in your
> case (if I got it correctly from your description), you should have one
> call every N tuples, where N is the size of your bin in tuples. Right?
>
> I hope I helped with my comments.
>
> Cheers,
> Nick
>
> On Sat, Jan 30, 2016 at 12:16 PM, John Yost <[email protected]> wrote:
>
>> Hi Everyone,
>>
>> I have a large fan-out that I've posted questions about before with the
>> following new, updated info:
>>
>> 1. Incoming tuple to Bolt A produces 15-20 tuples
>> 2. Bolt A emits to Bolt B via fieldsGrouping
>> 3. I cache outgoing tuples in bins within Bolt A and then emit anchored
>> tuples to Bolt B with the OutputCollector *emit
>> <http://storm.apache.org/apidocs/backtype/storm/task/OutputCollector.html#emit(java.util.Collection,%20java.util.List)>*
>> (Collection
>> <http://docs.oracle.com/javase/6/docs/api/java/util/Collection.html?is-external=true>
>> <Tuple <http://storm.apache.org/apidocs/backtype/storm/tuple/Tuple.html>
>> > anchors, List
>> <http://docs.oracle.com/javase/6/docs/api/java/util/List.html?is-external=true>
>> <Object
>> <http://docs.oracle.com/javase/6/docs/api/java/lang/Object.html?is-external=true>
>> > tuple) method
>> 4. I have throughput where I need it to be if I just receive tuples in
>> Bolt B, ack, and drop. If I do actual processing in Bolt B, throughput
>> degrades a bunch.
>> 5. I profiled the Bolt B worker yesterday and see that over 90% is spent
>> in com.lmax.disruptor.BlockingWaitStrategy--irrespective if I drop the
>> tuples or process in Bolt B
>>
>> I am wondering if the acking of the anchor tuples is what's resulting in
>> so much time spent in the LMAX messaging layer.  What do y'all think?  Any
>> ideas appreciated as always.
>>
>> Thanks! :)
>>
>> --John
>>
>
>
>
> --
> Nick R. Katsipoulakis,
> Department of Computer Science
> University of Pittsburgh
>

Reply via email to