Thank you. Is there any other optimizing method such as modifying storm config? I set the TOPOLOGY_DISRUPTOR_BATCH_SIZE and TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS to 1, it seems better only when I set the worker number to 1.
发件人: Kevin Conaway <[email protected]<mailto:[email protected]>> 答复: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> 日期: 2016年5月31日 星期二 下午8:53 至: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> 主题: Re: How to improve the intercommunication latency of spout/bolt Try using localOrShuffle grouping. Storm will attempt to pass messages directly to the next component within the same JVM, if possible On Tuesday, May 31, 2016, 林海涛 <[email protected]<mailto:[email protected]>> wrote: Hello. I do test with a simple topology to test the intercommunication latency of spout/bolt. It’s just emit the current nano timestamp from a spout and print the time difference when a bolt receive it. I deploy my storm cluster in my own machine with docker container (one nimbus, one supervisor), and run the topology in cluster mode. code as below: public classRandomSpout extends BaseRichSpout{ SpoutOutputCollector _collector; publicvoid open(Mapconf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } publicvoid nextTuple() { Utils.sleep(1000); longcurrentTime = System.nanoTime(); _collector.emit(new Values(currentTime)); } publicvoid declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub arg0.declare(new Fields("value")); } } public classPrintBolt extends BaseRichBolt{ private LogFileWriter _logFile; publicvoid execute(Tuple arg0) { // TODO Auto-generated method stub longprevTime = arg0.getLong(0); longcurrentTime = System.nanoTime(); _logFile.writeLog("cost: " + (currentTime - prevTime)); } publicvoid prepare(Maparg0, TopologyContext arg1, OutputCollector arg2) { // TODO Auto-generated method stub try { _logFile = new LogFileWriter("StormTest”, this.getClass().getSimpleName()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } publicvoid declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub arg0.declare(new Fields("value")); } } public class Topology { public staticvoid main( String[] args ) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSpout(), 1); builder.setBolt("bolt", new PrintBolt(), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(false); if(args.length > 0){ // cluster submit. conf.setNumWorkers(2); conf.setNumAckers(0); try { StormSubmitter.submitTopology("stormTest", conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else{ new LocalCluster().submitTopology("stormTest", conf, builder.createTopology()); } } } Output is below: [2016-05-31 09:13:53]cost: 1960336 [2016-05-31 09:13:54]cost: 2600239 [2016-05-31 09:13:55]cost: 3103449 [2016-05-31 09:13:56]cost: 3206544 [2016-05-31 09:13:57]cost: 3783647 [2016-05-31 09:13:58]cost: 3635923 [2016-05-31 09:13:59]cost: 3887787 [2016-05-31 09:14:00]cost: 1623692 [2016-05-31 09:14:01]cost: 2524674 [2016-05-31 09:14:02]cost: 3383506 [2016-05-31 09:14:03]cost: 3898478 [2016-05-31 09:14:04]cost: 2120949 [2016-05-31 09:14:05]cost: 3756272 [2016-05-31 09:14:06]cost: 2877997 [2016-05-31 09:14:07]cost: 3432532 [2016-05-31 09:14:08]cost: 3638306 [2016-05-31 09:14:09]cost: 2958907 [2016-05-31 09:14:10]cost: 2742666 [2016-05-31 09:14:11]cost: 3024576 [2016-05-31 09:14:12]cost: 2822562 [2016-05-31 09:14:13]cost: 2623060 [2016-05-31 09:14:14]cost: 4045938 Obviously, there is a 2ms latency approximately. It seems not good for me. How can I reduce the latency? -- Kevin Conaway http://www.linkedin.com/pub/kevin-conaway/7/107/580/ https://github.com/kevinconaway
