Hey Nick, WOW! Thanks a bunch for responding so quickly and with such an complete and awesome analysis of the issue! Very much appreciate your thoughts.
My definition of throughput is number of tuples acked/minute through the entire topology as reported in the Topology stats section of the Storm UI. Just to ensure I've described the situation correctly, I am binning in Bolt A, building up a bin container consisting of (1) a List of tuples generated in Bolt A and (2) a list of corresponding anchor tuples that were emitted by the KafkaSpout to Bolt A. Once each bin container builds up to a configurable count limit of generated tuples, Bolt A emits via the OutputCollector *emit <http://storm.apache.org/apidocs/backtype/storm/task/OutputCollector.html#emit(java.util.Collection,%20java.util.List)>* (Collection <http://docs.oracle.com/javase/6/docs/api/java/util/Collection.html?is-external=true> <Tuple <http://storm.apache.org/apidocs/backtype/storm/tuple/Tuple.html> > anchors, List <http://docs.oracle.com/javase/6/docs/api/java/util/List.html?is-external=true> <Object <http://docs.oracle.com/javase/6/docs/api/java/lang/Object.html?is-external=true> > tuple) method the bin container as the single outgoing tuple (second method argument (tuple), therefore a List consisting of one container cast as a List[Object]) and a List of anchor tuples (first method argument (anchors)). Again, there is one outgoing tuple anchored by a List of incoming tuples. The ~90% of time spent in the LMAX messaging layer is in Bolt B. One point that I left out is that the execute latency of Bolt A is between 2-3 ms, whereas Bolt B is 60-65 ms. That's why I am thinking that the acking of the anchor tuples in Bolt B could be slowing things down as the tuple does not exit the Bolt B execute method until ack is called, and the execute latency indicates the time spent in Bolt B--and therefore in the LMAX messaging layer--is relatively high. I agree that it makes sense to try an experiment where I don't anchor each bin container tuple with the corresponding input tuples to see if that decreases execute latency in Bolt B as well as the time spent in c om.lmax.disruptor.BlockingWaitStrategy. If this makes a significant impact, then I may change things up so that I cache the bin containers until the number of anchor tuples reaches a threshold and then emit. The key thing is that I want to anchor each bin container tuple because I want to ensure at least once processing of each tuple entering the topology from the KafkaSpout. Thanks again for your thoughts and taking time to provide a great analysis of what I am working with. I am going to read and respond to Kashyap's inputs. --John On Sat, Jan 30, 2016 at 12:50 PM, Nick R. Katsipoulakis < [email protected]> wrote: > Hello John, > > First off, let us agree on your definition of throughput. Do you define > throughput as the average number of tuples each of your last bolts (sinks) > emit per second? If yes, then OK. Otherwise, please provide us with more > details. > > Going back to the BlockingWaitStrategy observation you have, it (most > probably) means that since you are producing a large number of tuples > (15-20 tuples) the outgoing Disruptor queue gets full, and the emit() > function blocks. Also, since you are anchoring tuples (that might mean > exactly-once semantics), it basically takes more time to place something in > the queue, in order to guarantee deliver of all tuples to a downstream > bolt. > > Therefore, it makes sense to see so much time spent in the LMAX messaging > layer. A good experiment to verify your hypothesis, is to not anchor > tuples, and profile your topology again. However, I am not sure that you > will see a much different percentage, since for every tuple you are > receiving, you have at least one call to the Disruptor layer. Maybe in your > case (if I got it correctly from your description), you should have one > call every N tuples, where N is the size of your bin in tuples. Right? > > I hope I helped with my comments. > > Cheers, > Nick > > On Sat, Jan 30, 2016 at 12:16 PM, John Yost <[email protected]> wrote: > >> Hi Everyone, >> >> I have a large fan-out that I've posted questions about before with the >> following new, updated info: >> >> 1. Incoming tuple to Bolt A produces 15-20 tuples >> 2. Bolt A emits to Bolt B via fieldsGrouping >> 3. I cache outgoing tuples in bins within Bolt A and then emit anchored >> tuples to Bolt B with the OutputCollector *emit >> <http://storm.apache.org/apidocs/backtype/storm/task/OutputCollector.html#emit(java.util.Collection,%20java.util.List)>* >> (Collection >> <http://docs.oracle.com/javase/6/docs/api/java/util/Collection.html?is-external=true> >> <Tuple <http://storm.apache.org/apidocs/backtype/storm/tuple/Tuple.html> >> > anchors, List >> <http://docs.oracle.com/javase/6/docs/api/java/util/List.html?is-external=true> >> <Object >> <http://docs.oracle.com/javase/6/docs/api/java/lang/Object.html?is-external=true> >> > tuple) method >> 4. I have throughput where I need it to be if I just receive tuples in >> Bolt B, ack, and drop. If I do actual processing in Bolt B, throughput >> degrades a bunch. >> 5. I profiled the Bolt B worker yesterday and see that over 90% is spent >> in com.lmax.disruptor.BlockingWaitStrategy--irrespective if I drop the >> tuples or process in Bolt B >> >> I am wondering if the acking of the anchor tuples is what's resulting in >> so much time spent in the LMAX messaging layer. What do y'all think? Any >> ideas appreciated as always. >> >> Thanks! :) >> >> --John >> > > > > -- > Nick R. Katsipoulakis, > Department of Computer Science > University of Pittsburgh >
