Hello all, I have been working in Storm on low-latency applications. Specifically, I currently work on an application in which I join 4 different streams (relational join for those familiar with Relational Algebra). Before I jump into the discussion, I would like to present you the technical details of my cluster:
- Apache Storm 0.9.4 - OpenJDK v1.7 - ZooKeeper v3.4.6 - 3 X ZooKeeper nodes (Amazon EC2 m4.xlarge instances) - 1 X Nimbus node (Amazon EC2 m4.xlarge instance) - 4 X Supervisor nodes (Amazon EC2 m4.2xlarge instances) each one with 1 worker (JVM process) - each worker gets spawned with the following arguments "-Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true" Each supervisor node has 8 vCPUs, so I expect performance comparable to 8 physical cores. The reason for the aforementioned setup is that (after interacting with the Storm user community) I understood that it is better (for latency) to have as many threads as your cores in one machine, sharing the same worker process. The reason behind is that communication is done through lmax disruptor, which claims to achieve orders of magnitude lower latency than communication between workers. Turning to my application, I am joining 4 streams, in a 14 executor topology, in which I have 4 spouts, one for each data source, 1 "sink" bolt, where the data end up, and the rest are bolts for intermediate processing. Now, is time how to measure latency. I know that I could make use of the metrics exposed by Storm, by I wanted a finer granularity of metrics, so I accumulate my own stats. Let us consider a topology as a directed-acyclic-graph (DAG), in which each vertex is a bolt/spout and each edge is a communication channel. In order to measure latency per edge in my DAG (topology), I have the following mechanism: every x number of tuples, I initiate a sequence of three tuples sent every 1 second (intervals). So, if at time *t* the tuple sequence starts, I send the first. I continue executing on normal tuples, and at time* t + 1 *I send the second tuple, and so on and so forth. Then, by the time all 3 tuples are received on the downstream bolt, I calculate the average of arrival-time differences (by removing the 1 second interval of course) and get the average waiting time of tuples in the input queue (pretty similar to Lamport' s vector clocks). The interesting observation I make is the following: I expect to see lower latencies between threads that execute on the same machine (communication through LMAX) compared to latencies between threads executing on different machines (communication through Netty). However, the results are not consistent with my previous statement, and, in fact, I come across opposite results (lower latencies between remote threads compared to latancies between co-located threads). My previous observation is particularly strange and I want to see if someone else came across a similar issue. If you think that I do something wrong in my setup and I end up with the previous anomaly, please indicate to me what I am doing wrong. Thank you very much for your time and I apologize for the long post. Regards, Nick
