I have a requirement to prove kafka producer can produce 1 million events/second to Kafka cluster.
So far, best I could achieve is 200k events/sec on topic with 2 partitions. The latency increases with adding more partitions so I want to test with 2 partitions for now. Below are the details along with produce code (java). How can I achieve produce 1million event/sec.? I went thru kafka benchmarking blog as well. https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines *Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs), 64GB RAM. *Broker:* Allocated 6GB, 16 io.threads, 8 network threads. *Topic: 2* partition, replication factor of 1 (Get high latency) *Zookeepers: *3 zk instances running individually on master nodes (not co-located with kafka broker/servers) *Producer Code:* public class TestProducer { private static String msg = "TEST KAFKA PERFORMANCE"; private static Logger LOG = Logger.getLogger(TestProducer.class); public static void main(String... args){ System.out.println("START - Test Producer"); long messageCount = Long.parseLong(args[0]); long messageCountForStat = Long.parseLong(args[0]); String topic = args[1]; String brokerList = args[2]; int batchCount = Integer.parseInt(args[3]); int topicPartions = Integer.parseInt(args[4]); Producer<String, String> producer = getProducer(brokerList, batchCount); Date startTime = new Date(System.currentTimeMillis()); Random rnd = new Random(); String partition = ""; //Produce messages. while (messageCount != 0) { partition = ""+(int)messageCount%topicPartions; KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, partition, msg); producer.send(message); messageCount--; } Date endTime = new Date(System.currentTimeMillis()); System.out.println("#########################################"); System.out.println("MESSAGES SENT: " + messageCountForStat); System.out.println("START TIME: " + startTime); System.out.println("END TIME: " + endTime); System.out.println("#########################################"); System.out.println("END - Test Producer"); } public static Producer<String, String> getProducer(String brokerList, int batchSize) { props.put("metadata.broker.list", brokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "com.my.SimplePartitioner"); props.put("request.required.acks", "0"); props.put("producer.type", "async"); props.put("compression.codec", "snappy"); props.put("batch.num.messages", Integer.toString(batchSize)); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); return producer; } }