praveen, we had the same problem at first, make sure you do not flush after every single message to disk, this kills throughput. not sure what the config option is called, but it was on by default.
tim Sent from my iPhone On 10.02.2012, at 10:57, Praveen Ramachandra <pravee...@gmail.com> wrote: > Hi All, > > > I am getting ridiculously low producer and consumer throughput. > > I am using default config values for producer, consumer and broker > which are very good starting points, as they should yield sufficient > throughput. > > Only config that I changed on the server is "num-partitions". Changed > it to 20 (instead of 1). With this change the throughput increased to > 2k messages per second (size 1k), but still it is far lower than what > I would have expected. > > Appreciate if you can point to settings/changes-in-code needs to be done > to get higher throughput. > > > > ====Consumer Code===== > long startTime = System.currentTimeMillis(); > long endTime = startTime + runDuration*1000l; > > Properties props = new Properties(); > props.put("zk.connect", "localhost:2181"); > props.put("groupid", subscriptionName); // to support multiple > subscribers > props.put("zk.sessiontimeout.ms", "400"); > props.put("zk.synctime.ms", "200"); > props.put("autocommit.interval.ms", "1000"); > > consConfig = new ConsumerConfig(props); > consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(consConfig); > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > topicCountMap.put(topicName, new Integer(1)); // has the topic > to which to subscribe to > Map<String, List<KafkaMessageStream<Message>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > KafkaMessageStream<Message> stream = > consumerMap.get(topicName).get(0); > ConsumerIterator<Message> it = stream.iterator(); > > while(System.currentTimeMillis() <= endTime ) > { > it.next(); // discard data > consumeMsgCount.incrementAndGet(); > } > > ====End consumer CODE============================ > > > =====Producer CODE======================== > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("zk.connect", "localhost:2181"); > // Use random partitioner. Don't need the key type. Just > set it to Integer. > // The message is of type String. > producer = new kafka.javaapi.producer.Producer<Integer, > String>(new ProducerConfig(props)); > > > long endTime = startTime + runDuration*1000l; // run duration > is in seconds > while(System.currentTimeMillis() <= endTime ) > { > String msg = > org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); > producer.send(new ProducerData<Integer, String>(topicName, msg)); > pc.incrementAndGet(); > > } > java.util.Date date = new java.util.Date(System.currentTimeMillis()); > System.out.println(date+" :: stopped producer for topic"+topicName); > > =====END Producer CODE======================== > > > -- > Regards, > Praveen Ramachandra > > > > -- > -- > Regards, > Praveen Ramachandra