You might be affected by this issue https://issues.apache.org/jira/browse/STORM-3102.
2018-06-26 8:33 GMT+02:00 zz <[email protected]>: > hi > I am a new storm user, i have met a performance problem,please give me > some suggestion. > > dev environment: > server os:centos 7.5 > zookeeper:3.4.10(one node) > kafka:2.11-1.1.0(one node) > storm:1.1.1 or 1.1.2 > client os:win10 > > repro step: > 1、send 1000000 messages to kafka(topic name is "test"),every message size > is 500 byte. > 2、run program under local mode with storm-core1.1.1 and > storm-kafka-client1.1.1, it only spent about 20s to consume 1000000 > messages. > 3、same program, same settings(server and client) run program under local > mode with storm-core1.1.2 and storm-kafka-client1.1.2, it spent about 15 > minutes to consume 1000000 messages. > > I read some code of storm-kafka-client1.1.1 and storm-kafka-client1.1.2 > ,there were lots of modification between two version, i want to know how to > config storm-kafka-client for 1.1.2 or make some settings for resoving > performance problem of 1.1.2? > > Thank you. > PS: > 1. below is test code. > 2. I want to upload attachments that storm topology generated,but > failed,if log is important for investigating, you can download them from > this url. > https://pan.baidu.com/s/1MEyY6HAVnMdXrPT3n_S2zg > > > package hjzh; > import java.util.Date; > import java.util.Map; > import java.util.Properties; > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.AlreadyAliveException; > import org.apache.storm.generated.AuthorizationException; > import org.apache.storm.generated.InvalidTopologyException; > import org.apache.storm.kafka.spout.KafkaSpout; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.KafkaSpoutConfig. > FirstPollOffsetStrategy; > // import org.apache.storm.kafka.spout.KafkaSpoutConfig. > ProcessingGuarantee; > import org.apache.storm.task.OutputCollector; > import org.apache.storm.task.TopologyContext; > import org.apache.storm.topology.OutputFieldsDeclarer; > import org.apache.storm.topology.TopologyBuilder; > import org.apache.storm.topology.base.BaseRichBolt; > import org.apache.storm.tuple.Tuple; > /** > * Hello world! > */ > public final class TestTopology { > private TestTopology() { > } > public static class PerfBolt extends BaseRichBolt { > > private OutputCollector collector; > private static final long seriesVersionUID = 886149197481637894L; > private static long counter = 1; > @Override > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > this.collector = collector; > } > @Override > public void execute(Tuple input) { > String tmp = input.getString(0); > if (counter++ % 100000 == 0) { > System.out.println(counter + ":" + new Date()); > } > collector.ack(input); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer declare) { > } > } > public static void main(String[] args) { > Properties props = new Properties(); > props.put("group.id", "group7"); > KafkaSpoutConfig<String, String> sc = KafkaSpoutConfig.builder("192. > 168.128.128:9092", "test") > .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST) > .setProp(props) > .build(); > TopologyBuilder tp = new TopologyBuilder(); > tp.setSpout("kafka_spout", new KafkaSpout<String, String>(sc), 1); > tp.setBolt("kafka_bolt", new PerfBolt(), 1).shuffleGrouping("kafka_spout" > ); > Config conf = new Config(); > conf.setDebug(false); > if (args == null || args.length == 0) { > LocalCluster lc = new LocalCluster(); > lc.submitTopology("topology", conf, tp.createTopology()); > try { > Thread.sleep(600000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > lc.shutdown(); > } else { > try { > StormSubmitter.submitTopology("topology", conf, tp.createTopology()); > } catch (AlreadyAliveException e) { > e.printStackTrace(); > } catch (InvalidTopologyException e) { > e.printStackTrace(); > } catch (AuthorizationException e) { > e.printStackTrace(); > } > } > } > } > > > > > >
