You might be affected by this issue
https://issues.apache.org/jira/browse/STORM-3102.

2018-06-26 8:33 GMT+02:00 zz <[email protected]>:

> hi
> I am a new storm user, i have met a performance problem,please give me
> some suggestion.
>
> dev environment:
> server os:centos 7.5
> zookeeper:3.4.10(one node)
> kafka:2.11-1.1.0(one node)
> storm:1.1.1 or 1.1.2
> client os:win10
>
> repro step:
> 1、send 1000000 messages to kafka(topic name is "test"),every message size
> is 500 byte.
> 2、run program under local mode with storm-core1.1.1 and
> storm-kafka-client1.1.1, it only spent about 20s to consume 1000000
> messages.
> 3、same program, same settings(server and client) run program under local
> mode with storm-core1.1.2 and storm-kafka-client1.1.2, it spent about 15
> minutes to consume 1000000 messages.
>
> I read some code of storm-kafka-client1.1.1 and storm-kafka-client1.1.2
> ,there were lots of modification between two version, i want to know how to
> config storm-kafka-client for 1.1.2 or make some settings for resoving
> performance problem of 1.1.2?
>
> Thank you.
> PS:
> 1. below is test code.
> 2. I want to upload attachments that storm topology generated,but
> failed,if log is important for investigating, you can download them from
> this url.
> https://pan.baidu.com/s/1MEyY6HAVnMdXrPT3n_S2zg
>
>
> package hjzh;
> import java.util.Date;
> import java.util.Map;
> import java.util.Properties;
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.AlreadyAliveException;
> import org.apache.storm.generated.AuthorizationException;
> import org.apache.storm.generated.InvalidTopologyException;
> import org.apache.storm.kafka.spout.KafkaSpout;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig.
> FirstPollOffsetStrategy;
> // import org.apache.storm.kafka.spout.KafkaSpoutConfig.
> ProcessingGuarantee;
> import org.apache.storm.task.OutputCollector;
> import org.apache.storm.task.TopologyContext;
> import org.apache.storm.topology.OutputFieldsDeclarer;
> import org.apache.storm.topology.TopologyBuilder;
> import org.apache.storm.topology.base.BaseRichBolt;
> import org.apache.storm.tuple.Tuple;
> /**
> * Hello world!
> */
> public final class TestTopology {
> private TestTopology() {
> }
> public static class PerfBolt extends BaseRichBolt {
>
> private OutputCollector collector;
> private static final long seriesVersionUID = 886149197481637894L;
> private static long counter = 1;
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.collector = collector;
> }
> @Override
> public void execute(Tuple input) {
> String tmp = input.getString(0);
> if (counter++ % 100000 == 0) {
> System.out.println(counter + ":" + new Date());
> }
> collector.ack(input);
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declare) {
> }
> }
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put("group.id", "group7");
> KafkaSpoutConfig<String, String> sc = KafkaSpoutConfig.builder("192.
> 168.128.128:9092", "test")
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
> .setProp(props)
> .build();
> TopologyBuilder tp = new TopologyBuilder();
> tp.setSpout("kafka_spout", new KafkaSpout<String, String>(sc), 1);
> tp.setBolt("kafka_bolt", new PerfBolt(), 1).shuffleGrouping("kafka_spout"
> );
> Config conf = new Config();
> conf.setDebug(false);
> if (args == null || args.length == 0) {
> LocalCluster lc = new LocalCluster();
> lc.submitTopology("topology", conf, tp.createTopology());
> try {
> Thread.sleep(600000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> lc.shutdown();
> } else {
> try {
> StormSubmitter.submitTopology("topology", conf, tp.createTopology());
> } catch (AlreadyAliveException e) {
> e.printStackTrace();
> } catch (InvalidTopologyException e) {
> e.printStackTrace();
> } catch (AuthorizationException e) {
> e.printStackTrace();
> }
> }
> }
> }
>
>
>
>
>
>

Reply via email to