hi I am a new storm user, i have met a performance problem,please give me some suggestion.
dev environment: server os:centos 7.5 zookeeper:3.4.10(one node) kafka:2.11-1.1.0(one node) storm:1.1.1 or 1.1.2 client os:win10 repro step: 1、send 1000000 messages to kafka(topic name is "test"),every message size is 500 byte. 2、run program under local mode with storm-core1.1.1 and storm-kafka-client1.1.1, it only spent about 20s to consume 1000000 messages. 3、same program, same settings(server and client) run program under local mode with storm-core1.1.2 and storm-kafka-client1.1.2, it spent about 15 minutes to consume 1000000 messages. I read some code of storm-kafka-client1.1.1 and storm-kafka-client1.1.2 ,there were lots of modification between two version, i want to know how to config storm-kafka-client for 1.1.2 or make some settings for resoving performance problem of 1.1.2? Thank you. PS: 1. below is test code. 2. I want to upload attachments that storm topology generated,but failed,if log is important for investigating, you can download them from this url. https://pan.baidu.com/s/1MEyY6HAVnMdXrPT3n_S2zg packagehjzh; importjava.util.Date; importjava.util.Map; importjava.util.Properties; importorg.apache.storm.Config; importorg.apache.storm.LocalCluster; importorg.apache.storm.StormSubmitter; importorg.apache.storm.generated.AlreadyAliveException; importorg.apache.storm.generated.AuthorizationException; importorg.apache.storm.generated.InvalidTopologyException; importorg.apache.storm.kafka.spout.KafkaSpout; importorg.apache.storm.kafka.spout.KafkaSpoutConfig; importorg.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; // import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; importorg.apache.storm.task.OutputCollector; importorg.apache.storm.task.TopologyContext; importorg.apache.storm.topology.OutputFieldsDeclarer; importorg.apache.storm.topology.TopologyBuilder; importorg.apache.storm.topology.base.BaseRichBolt; importorg.apache.storm.tuple.Tuple; /** * Hello world! */ publicfinalclassTestTopology { privateTestTopology() { } publicstaticclassPerfBoltextendsBaseRichBolt { privateOutputCollector collector; privatestaticfinallong seriesVersionUID =886149197481637894L; privatestaticlong counter =1; @Override publicvoidprepare(MapstormConf, TopologyContextcontext, OutputCollectorcollector) { this.collector = collector; } @Override publicvoidexecute(Tupleinput) { String tmp = input.getString(0); if (counter++%100000==0) { System.out.println(counter +":"+newDate()); } collector.ack(input); } @Override publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclare) { } } publicstaticvoidmain(String[] args) { Properties props =newProperties(); props.put("group.id", "group7"); KafkaSpoutConfig<String, String> sc = KafkaSpoutConfig.builder("192.168.128.128:9092", "test") .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST) .setProp(props) .build(); TopologyBuilder tp =newTopologyBuilder(); tp.setSpout("kafka_spout", newKafkaSpout<String, String>(sc), 1); tp.setBolt("kafka_bolt", newPerfBolt(), 1).shuffleGrouping("kafka_spout"); Config conf =newConfig(); conf.setDebug(false); if (args ==null|| args.length ==0) { LocalCluster lc =newLocalCluster(); lc.submitTopology("topology", conf, tp.createTopology()); try { Thread.sleep(600000); } catch (InterruptedExceptione) { e.printStackTrace(); } lc.shutdown(); } else { try { StormSubmitter.submitTopology("topology", conf, tp.createTopology()); } catch (AlreadyAliveExceptione) { e.printStackTrace(); } catch (InvalidTopologyExceptione) { e.printStackTrace(); } catch (AuthorizationExceptione) { e.printStackTrace(); } } } }
