You can configure how much to fetch from kafka incase you have not done the below like config
spoutConf.fetchSizeBytes = 10*1024*1024; // these many bytes default is 1024*1024 i guess Regards Sai On Tue, Oct 28, 2014 at 2:59 PM, Sa Li <[email protected]> wrote: > Hi, all > > I have a question about TridentKafkaSpout, I am using TridentKafkaSpout to > consume data from > Kafka cluster, I found the latency of kafkaSpout is too slow to > acceptable, see this is the code > > BrokerHosts zk = new ZkHosts("10.100.70.128:2181"); > TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest"); > spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); > OpaqueTridentKafkaSpout kafkaSpout = new > OpaqueTridentKafkaSpout(spoutConf); > TridentTopology topology = new TridentTopology(); > > I use two ways to create stream, one is KafkaSpout to read data from > kafka, one is RandomTupleSpout to generate tuple randomly. > > topology.newStream("topictestspout", kafkaSpout) > //topology.newStream("test", new RandomTupleSpout()) > > .persistentAggregate(PostgresqlState.newFactory(config), new > Fields("userid","event"), new EventUpdater(), new Fields( "eventword")) > > > I am expecting to process the batch data in State, but I found list size > of messages is up to 4 by using kafkaSpout no matter how large the batch > size I set, but with RandomTupleSpout(), the actual batch size in memory > can reach few hundreds (not very high), so I am concluding the KafkaSpout > consume data too slow, it that because something wrong with my code, or > something I need to change in Kafka/storm cluster configurations. Currently > I am using 3-nodes (9 brokers) kafka cluster, single node storm cluster, > 3-node zk ensemble. Above code is in the local mode. > > thanks > > Alec >
