You can configure how much to fetch from kafka incase you have not done the
below like config

spoutConf.fetchSizeBytes = 10*1024*1024; // these many bytes default is
1024*1024 i guess


Regards

Sai

On Tue, Oct 28, 2014 at 2:59 PM, Sa Li <[email protected]> wrote:

> Hi, all
>
> I have a question about TridentKafkaSpout, I am using TridentKafkaSpout to
> consume data from
> Kafka cluster, I found the latency of  kafkaSpout is too slow to
> acceptable, see this is the code
>
> BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> TridentTopology topology = new TridentTopology();
>
> I use two ways to create stream, one is KafkaSpout to read data from
> kafka, one is RandomTupleSpout to generate tuple randomly.
>
> topology.newStream("topictestspout", kafkaSpout)
> //topology.newStream("test", new RandomTupleSpout())
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"))
>
>
> I am expecting to process the batch data in State, but I found list size
> of messages is up to 4 by using kafkaSpout no matter how large the batch
> size I set, but with RandomTupleSpout(), the actual batch size in memory
> can reach few hundreds (not very high), so I am concluding the KafkaSpout
> consume data too slow, it that because something wrong with my code, or
> something I need to change in Kafka/storm cluster configurations. Currently
> I am using 3-nodes (9 brokers) kafka cluster, single node storm cluster,
> 3-node zk ensemble. Above code is in the local mode.
>
> thanks
>
> Alec
>

Reply via email to