Nick, I had the same experience and I computed that the latencies between spout emit and bolt consume and bolt emit to next bolt consume varies from 2 to 25ms. Have still not figured out what the problem is.
The thread dumps indicate lmax disruptor but I guess that is expected. Thanks Kashyap On Jul 3, 2015 02:56, "Nick R. Katsipoulakis" <[email protected]> wrote: > Hello all, > > I have been working in Storm on low-latency applications. Specifically, I > currently work on an application in which I join 4 different streams > (relational join for those familiar with Relational Algebra). Before I jump > into the discussion, I would like to present you the technical details of > my cluster: > > > - Apache Storm 0.9.4 > - OpenJDK v1.7 > - ZooKeeper v3.4.6 > - 3 X ZooKeeper nodes (Amazon EC2 m4.xlarge instances) > - 1 X Nimbus node (Amazon EC2 m4.xlarge instance) > - 4 X Supervisor nodes (Amazon EC2 m4.2xlarge instances) each one with > 1 worker (JVM process) > - each worker gets spawned with the following arguments "-Xmx4096m > -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC > -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 > -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true" > > > Each supervisor node has 8 vCPUs, so I expect performance comparable to 8 > physical cores. The reason for the aforementioned setup is that (after > interacting with the Storm user community) I understood that it is better > (for latency) to have as many threads as your cores in one machine, sharing > the same worker process. The reason behind is that communication is done > through lmax disruptor, which claims to achieve orders of magnitude lower > latency than communication between workers. > > Turning to my application, I am joining 4 streams, in a 14 executor > topology, in which I have 4 spouts, one for each data source, 1 "sink" > bolt, where the data end up, and the rest are bolts for intermediate > processing. > > Now, is time how to measure latency. I know that I could make use of the > metrics exposed by Storm, by I wanted a finer granularity of metrics, so I > accumulate my own stats. > > Let us consider a topology as a directed-acyclic-graph (DAG), in which > each vertex is a bolt/spout and each edge is a communication channel. > In order to measure latency per edge in my DAG (topology), I have the > following mechanism: every x number of tuples, I initiate a sequence of > three tuples sent every 1 second (intervals). So, if at time *t* the > tuple sequence starts, I send the first. I continue executing on normal > tuples, and at time* t + 1 *I send the second tuple, and so on and so > forth. Then, by the time all 3 tuples are received on the downstream bolt, > I calculate the average of arrival-time differences (by removing the 1 > second interval of course) and get the average waiting time of tuples in > the input queue (pretty similar to Lamport' s vector clocks). > > The interesting observation I make is the following: I expect to see lower > latencies between threads that execute on the same machine (communication > through LMAX) compared to latencies between threads executing on different > machines (communication through Netty). However, the results are not > consistent with my previous statement, and, in fact, I come across opposite > results (lower latencies between remote threads compared to latancies > between co-located threads). > > My previous observation is particularly strange and I want to see if > someone else came across a similar issue. If you think that I do something > wrong in my setup and I end up with the previous anomaly, please indicate > to me what I am doing wrong. > > Thank you very much for your time and I apologize for the long post. > > Regards, > Nick > >
