Hi All,
I am getting ridiculously low producer and consumer throughput. I am using default config values for producer, consumer and broker which are very good starting points, as they should yield sufficient throughput. Only config that I changed on the server is "num-partitions". Changed it to 20 (instead of 1). With this change the throughput increased to 2k messages per second (size 1k), but still it is far lower than what I would have expected. Appreciate if you can point to settings/changes-in-code needs to be done to get higher throughput. ====Consumer Code===== long startTime = System.currentTimeMillis(); long endTime = startTime + runDuration*1000l; Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("groupid", subscriptionName); // to support multiple subscribers props.put("zk.sessiontimeout.ms", "400"); props.put("zk.synctime.ms", "200"); props.put("autocommit.interval.ms", "1000"); consConfig = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topicName, new Integer(1)); // has the topic to which to subscribe to Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaMessageStream<Message> stream = consumerMap.get(topicName).get(0); ConsumerIterator<Message> it = stream.iterator(); while(System.currentTimeMillis() <= endTime ) { it.next(); // discard data consumeMsgCount.incrementAndGet(); } ====End consumer CODE============================ =====Producer CODE======================== props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("zk.connect", "localhost:2181"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); long endTime = startTime + runDuration*1000l; // run duration is in seconds while(System.currentTimeMillis() <= endTime ) { String msg = org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); producer.send(new ProducerData<Integer, String>(topicName, msg)); pc.incrementAndGet(); } java.util.Date date = new java.util.Date(System.currentTimeMillis()); System.out.println(date+" :: stopped producer for topic"+topicName); =====END Producer CODE======================== -- Regards, Praveen Ramachandra -- -- Regards, Praveen Ramachandra