Is there a way to test the throughput of trident KafkaSpout, say, how many message it consumes per second?
thanks Alec On Tue, Oct 28, 2014 at 5:15 PM, Sa Li <[email protected]> wrote: > Thanks a lot Sai, I tried topology.newStream("topictestspout", > kafkaSpout).parallelismHint(4~36), it wouldn't help a lot, i did see the > actual fetch message size is 5, that is far below what I expected, the > message size from Kafka is in the form of json, around 3.5k/msg. Below is > my code > > BrokerHosts zk = new ZkHosts("10.100.70.128:2181"); > TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, > "topictest"); > spoutConf.scheme = new SchemeAsMultiScheme(new > StringScheme()); > spoutConf.fetchSizeBytes = 50*1024*1024; > spoutConf.bufferSizeBytes = 50*1024*1024; > OpaqueTridentKafkaSpout kafkaSpout = new > OpaqueTridentKafkaSpout(spoutConf); > TridentTopology topology = new TridentTopology(); > > final PostgresqlStateConfig config = new > PostgresqlStateConfig(); > { > config.setUrl(dburl); > config.setTable("test.state"); > config.setKeyColumns(new String[]{"userid"}); > config.setValueColumns(new String[]{"event"}); > config.setType(StateType.NON_TRANSACTIONAL); > config.setCacheSize(5000); > } > > topology.newStream("topictestspout", kafkaSpout) > // topology.newStream("test", new RandomTupleSpout()) > // this test tells the kafkaSpout has the overhead to cause the latency > .parallelismHint(36) > // .shuffle() > // .each(new Fields("batchid","word"), > .each(new Fields("str"), > new JsonObjectParse(), > new Fields("userid","event")) > .groupBy(new Fields("userid")) > > .persistentAggregate(PostgresqlState.newFactory(config), new > Fields("userid","event"), new EventUpdater(), new Fields( "eventword")); > // .parallelismHint(32); > > > However the link you send in the last email looks good to tune the trident > parameters, I will go through that page see if it helps. In general, the > throughput should way larger than this, I did the consumer performance test > on the 3-node (9 brokers) kafka cluster, the throughput was averagely > 350Kmsg/sec, and each message is around 3~4K. So I am not sure such long > latency is the overhead caused by Storm or others. > > > thanks > > Alec > > > On Tue, Oct 28, 2014 at 3:59 PM, saiprasad mishra < > [email protected]> wrote: > >> Something like below >> my bad for the last quick email >> >> topology.newStream("topictestspout", kafkaSpout).parallelismHint(4) >> >> Regards >> Sai >> >> >> On Tue, Oct 28, 2014 at 3:48 PM, saiprasad mishra < >> [email protected]> wrote: >> >>> >>> you can increase the parallelismHint while creating the stream >>> >>> You can also take a look at this for some more explanation >>> >>> https://gist.github.com/mrflip/5958028 >>> >>> >>> Regards >>> Sai >>> >>> On Tue, Oct 28, 2014 at 3:35 PM, Sa Li <[email protected]> wrote: >>> >>>> Thanks Sai, I set spoutConf.fetchSizeBytes = 500*1024*1024; it didn't >>>> help, I assume there might be some other issues unresolved. >>>> >>>> THanks >>>> >>>> >>>> Alec >>>> >>>> On Tue, Oct 28, 2014 at 3:18 PM, saiprasad mishra < >>>> [email protected]> wrote: >>>> >>>>> You can configure how much to fetch from kafka incase you have not >>>>> done the below like config >>>>> >>>>> spoutConf.fetchSizeBytes = 10*1024*1024; // these many bytes default >>>>> is 1024*1024 i guess >>>>> >>>>> >>>>> Regards >>>>> >>>>> Sai >>>>> >>>>> On Tue, Oct 28, 2014 at 2:59 PM, Sa Li <[email protected]> wrote: >>>>> >>>>>> Hi, all >>>>>> >>>>>> I have a question about TridentKafkaSpout, I am using >>>>>> TridentKafkaSpout to consume data from >>>>>> Kafka cluster, I found the latency of kafkaSpout is too slow to >>>>>> acceptable, see this is the code >>>>>> >>>>>> BrokerHosts zk = new ZkHosts("10.100.70.128:2181"); >>>>>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, >>>>>> "topictest"); >>>>>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>>>> OpaqueTridentKafkaSpout kafkaSpout = new >>>>>> OpaqueTridentKafkaSpout(spoutConf); >>>>>> TridentTopology topology = new TridentTopology(); >>>>>> >>>>>> I use two ways to create stream, one is KafkaSpout to read data from >>>>>> kafka, one is RandomTupleSpout to generate tuple randomly. >>>>>> >>>>>> topology.newStream("topictestspout", kafkaSpout) >>>>>> //topology.newStream("test", new RandomTupleSpout()) >>>>>> >>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new >>>>>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword")) >>>>>> >>>>>> >>>>>> I am expecting to process the batch data in State, but I found list >>>>>> size of messages is up to 4 by using kafkaSpout no matter how large the >>>>>> batch size I set, but with RandomTupleSpout(), the actual batch size in >>>>>> memory can reach few hundreds (not very high), so I am concluding the >>>>>> KafkaSpout consume data too slow, it that because something wrong with my >>>>>> code, or something I need to change in Kafka/storm cluster >>>>>> configurations. >>>>>> Currently I am using 3-nodes (9 brokers) kafka cluster, single node storm >>>>>> cluster, 3-node zk ensemble. Above code is in the local mode. >>>>>> >>>>>> thanks >>>>>> >>>>>> Alec >>>>>> >>>>> >>>>> >>>> >>> >> >
