It could be that flush.interval in the broker is 1. Try increasing that to a larger number like 1000.
Thanks, Jun On Fri, Feb 10, 2012 at 1:57 AM, Praveen Ramachandra <pravee...@gmail.com>wrote: > Hi All, > > > I am getting ridiculously low producer and consumer throughput. > > I am using default config values for producer, consumer and broker > which are very good starting points, as they should yield sufficient > throughput. > > Only config that I changed on the server is "num-partitions". Changed > it to 20 (instead of 1). With this change the throughput increased to > 2k messages per second (size 1k), but still it is far lower than what > I would have expected. > > Appreciate if you can point to settings/changes-in-code needs to be done > to get higher throughput. > > > > ====Consumer Code===== > long startTime = System.currentTimeMillis(); > long endTime = startTime + runDuration*1000l; > > Properties props = new Properties(); > props.put("zk.connect", "localhost:2181"); > props.put("groupid", subscriptionName); // to support multiple > subscribers > props.put("zk.sessiontimeout.ms", "400"); > props.put("zk.synctime.ms", "200"); > props.put("autocommit.interval.ms", "1000"); > > consConfig = new ConsumerConfig(props); > consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(consConfig); > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > topicCountMap.put(topicName, new Integer(1)); // has the topic > to which to subscribe to > Map<String, List<KafkaMessageStream<Message>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > KafkaMessageStream<Message> stream = > consumerMap.get(topicName).get(0); > ConsumerIterator<Message> it = stream.iterator(); > > while(System.currentTimeMillis() <= endTime ) > { > it.next(); // discard data > consumeMsgCount.incrementAndGet(); > } > > ====End consumer CODE============================ > > > =====Producer CODE======================== > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("zk.connect", "localhost:2181"); > // Use random partitioner. Don't need the key type. Just > set it to Integer. > // The message is of type String. > producer = new kafka.javaapi.producer.Producer<Integer, > String>(new ProducerConfig(props)); > > > long endTime = startTime + runDuration*1000l; // run duration > is in seconds > while(System.currentTimeMillis() <= endTime ) > { > String msg = > org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); > producer.send(new ProducerData<Integer, String>(topicName, > msg)); > pc.incrementAndGet(); > > } > java.util.Date date = new > java.util.Date(System.currentTimeMillis()); > System.out.println(date+" :: stopped producer for topic"+topicName); > > =====END Producer CODE======================== > > > -- > Regards, > Praveen Ramachandra > > > > -- > -- > Regards, > Praveen Ramachandra >